1.过滤异常数据 2.优化sink写入代码 3.优化clickhouse配置

This commit is contained in:
zhanghongqing
2022-09-23 17:10:05 +08:00
parent 25e5b51766
commit 9cdfe060cf
31 changed files with 396 additions and 1001 deletions

13
pom.xml
View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>knowledge-log</artifactId>
<version>20220901</version>
<version>20220923</version>
<name>log-completion-schema</name>
<url>http://www.example.com</url>
@@ -218,7 +218,7 @@
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
<version>0.3.1-patch</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.flink</groupId>
@@ -230,16 +230,11 @@
<artifactId>arangodb-java-driver</artifactId>
<version>6.6.3</version>
</dependency>
<!-- <dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.2.0</version>
</dependency>-->
<!-- <dependency>-->
<!-- <groupId>ru.ivi.opensource</groupId>-->
<!-- <artifactId>flink-clickhouse-sink</artifactId>-->
<!-- <version>1.3.3</version>-->
<!-- </dependency>-->
</dependency>
</dependencies>
</project>

View File

@@ -45,3 +45,15 @@ log.transform.type=1
#\u4E24\u4E2A\u8F93\u51FA\u4E4B\u95F4\u7684\u6700\u5927\u65F6\u95F4(\u5355\u4F4Dmilliseconds)
buffer.timeout=15000
### datasource-pool
spring.datasource.hikari.minimum-idle=1
spring.datasource.hikari.maximum-pool-size=10
spring.datasource.hikari.auto-commit=true
spring.datasource.hikari.idle-timeout=10000
spring.datasource.hikari.max-lifetime=30000
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.connection-test-query=SELECT 1
ck.max.threads=20
#\u5355\u4F4Dsecond\uFF0C\u5065\u5EB7\u68C0\u67E5\u95F4\u9694
ck.schedule.actualization=20

View File

@@ -1,14 +1,14 @@
#--------------------------------\u5730\u5740\u914D\u7F6E------------------------------#
#\u7BA1\u7406kafka\u5730\u5740,\u591A\u53F0\u9017\u53F7\u8FDE\u63A5ip1:9094,ip2:9094
source.kafka.servers=192.168.44.12:9094
source.kafka.servers=192.168.44.85:9094
#\u7BA1\u7406\u8F93\u51FAkafka\u5730\u5740
sink.kafka.servers=192.168.44.12:9094
sink.kafka.servers=192.168.44.85:9094
#--------------------------------HTTP/\u5B9A\u4F4D\u5E93/ssl------------------------------#
tools.library=
#--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------#
#\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B
group.id=KNOWLEDGE-GROUPtest
group.id=KNOWLEDGE-GROUP-20220905
#--------------------------------topology\u914D\u7F6E------------------------------#
#consumer \u5E76\u884C\u5EA6
source.parallelism=1
@@ -47,7 +47,7 @@ sink.arangodb.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN
#\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD70\uFF1A\u5426\uFF0C1\uFF1A\u662F
sink.ck.raw.log.insert.open=1
#clickhouse\u914D\u7F6E\uFF0C\u591A\u4E2A\u9017\u53F7\u8FDE\u63A5 ip1:8123,ip2:8123
ck.hosts=192.168.44.12:8123
ck.hosts=192.168.44.85:8123
# ,192.168.44.86:8123,192.168.44.87:8123
ck.database=tsg_galaxy_v3
ck.username=tsg_insert
@@ -56,19 +56,19 @@ ck.pin=galaxy2019
ck.connection.timeout=10000
ck.socket.timeout=600000
#clickhouse\u5165\u5E93\u6279\u91CF\u5355\u4F4D\u6761
ck.batch=20000
ck.batch=100000
#clickhouse\u5165\u5E93\u524D\u79EF\u7D2F\u6279\u91CF\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2
sink.ck.batch.delay.time=3000
sink.ck.batch.delay.time=30000
#flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4
flink.watermark.max.delay.time=60
#ck relation\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds
log.aggregate.duration=10
log.aggregate.duration=5
#arangodb \u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds
log.aggregate.duration.graph=30
log.aggregate.duration.graph=10
#arangoDB\u53C2\u6570\u914D\u7F6E
arangodb.host=192.168.44.12
arangodb.host=192.168.44.83
arangodb.port=8529
arangodb.user=root
arangodb.password=galaxy_2019
@@ -79,4 +79,7 @@ arangodb.thread.pool.number=10
#\u6279\u91CF\u7D2F\u8BA1\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2ms
sink.arangodb.batch.delay.time=1000
aggregate.max.value.length=18
aggregate.max.value.length=18
#\u662F\u5426\u5165ip2ip\u8868 1:\u662F
sink.arangodb.raw.log.insert.open=0

View File

@@ -137,4 +137,16 @@ public class FlowWriteConfig {
public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch");
public static final Integer SINK_CK_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.ck.raw.log.insert.open");
public static final Integer AGGREGATE_MAX_VALUE_LENGTH = FlowWriteConfigurations.getIntProperty(0, "aggregate.max.value.length");
public static final Integer SINK_ARANGODB_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.arangodb.raw.log.insert.open");
public static final Integer HIKARI_MINIMUM_IDLE = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.minimum-idle");
public static final Integer HIKARI_MAXIMUM_POOL_SIZE = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.maximum-pool-size");
public static final Long HIKARI_IDLE_TIMEOUT = FlowWriteConfigurations.getLongProperty(1, "spring.datasource.hikari.idle-timeout");
public static final Long HIKARI_MAX_LIFETIME = FlowWriteConfigurations.getLongProperty(1, "spring.datasource.hikari.max-lifetime");
public static final Integer HIKARI_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.connection-timeout");
public static final Integer CK_MAX_THREADS = FlowWriteConfigurations.getIntProperty(1, "ck.max.threads");
public static final Integer CK_SCHEDULE_ACTUALIZATION = FlowWriteConfigurations.getIntProperty(1, "ck.schedule.actualization");
}

View File

@@ -1,23 +0,0 @@
package com.zdjizhi.etl;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
@Override
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
Iterator<Map<String, Object>> iterator = iterable.iterator();
List<Map<String, Object>> batchLog = new ArrayList<>();
while (iterator.hasNext()) {
batchLog.add(iterator.next());
}
out.collect(batchLog);
}
}

View File

@@ -1,131 +0,0 @@
package com.zdjizhi.etl;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
*  * 带超时的计数窗口触发器
*/
public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {
private static Log logger = LogFactory.get();
/**
* 窗口最大数据量
*/
private int maxCount;
/**
* event time / process time
*/
private TimeCharacteristic timeType;
private String stateName;
public String getStateName() {
return stateName;
}
public void setStateName(String stateName) {
this.stateName = stateName;
}
public CountTriggerWithTimeout(String stateName) {
this.stateName = stateName;
}
/**
* 用于储存窗口当前数据量的状态对象
*/
private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor(getStateName() + "counter", new Sum(), LongSerializer.INSTANCE);
public CountTriggerWithTimeout(String stateName, int maxCount, TimeCharacteristic timeType) {
this.maxCount = maxCount;
this.timeType = timeType;
this.stateName = stateName;
}
private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
clear(window, ctx);
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
countState.add(1L);
if (countState.get() >= maxCount) {
logger.info("fire with count: " + countState.get());
return fireAndPurge(window, ctx);
}
if (timestamp >= window.getEnd()) {
logger.info("fire with tiem: " + timestamp);
return fireAndPurge(window, ctx);
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
if (timeType != TimeCharacteristic.ProcessingTime) {
return TriggerResult.CONTINUE;
}
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
logger.debug("fire with process tiem: " + time);
return fireAndPurge(window, ctx);
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
if (timeType != TimeCharacteristic.EventTime) {
return TriggerResult.CONTINUE;
}
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
logger.debug("fire with event tiem: " + time);
return fireAndPurge(window, ctx);
}
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
countState.clear();
}
/**
* 计数方法
*/
class Sum implements ReduceFunction<Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}

View File

@@ -1,45 +1,16 @@
package com.zdjizhi.etl;
import cn.hutool.json.JSONUtil;
import com.zdjizhi.etl.connection.ArangodbBatchIPWindow;
import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.CKSink;
import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.*;
import static com.zdjizhi.common.FlowWriteConfig.SINK_PARALLELISM;
public class LogService {
// @Deprecated
// public static void getLogCKSink3(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
//
// sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
// .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime))
// .apply(new CKBatchWindow())
// .addSink(new ClickhouseSink(sink))
// .setParallelism(SINK_PARALLELISM)
// .name(sink)
// .setParallelism(SINK_PARALLELISM);
//
// }
public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new ArangodbBatchIPWindow())
.addSink(new ArangoDBSink(sink))
.setParallelism(SINK_PARALLELISM)
.name(sink)
.setParallelism(SINK_PARALLELISM);
}
public static void getLogKafkaSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
sourceStream.map(JSONUtil::toJsonStr)
.setParallelism(SINK_PARALLELISM)

View File

@@ -1,36 +0,0 @@
package com.zdjizhi.etl.connection;
import cn.hutool.core.util.StrUtil;
import com.arangodb.entity.BaseEdgeDocument;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class ArangodbBatchIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
@Override
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception {
Iterator<Map<String, Object>> iterator = iterable.iterator();
List<BaseEdgeDocument> batchLog = new ArrayList<>();
while (iterator.hasNext()) {
Map<String, Object> next = iterator.next();
String srcIp = StrUtil.toString(next.get("src_ip"));
String dstIp = StrUtil.toString(next.get("dst_ip"));
BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
baseEdgeDocument.setKey(String.join("-", srcIp, dstIp));
baseEdgeDocument.setFrom("src_ip/" + srcIp);
baseEdgeDocument.setTo("dst_ip/" + dstIp);
baseEdgeDocument.addAttribute("src_ip", srcIp);
baseEdgeDocument.addAttribute("dst_ip", dstIp);
baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
batchLog.add(baseEdgeDocument);
}
collector.collect(batchLog);
}
}

View File

@@ -2,14 +2,14 @@ package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import com.alibaba.fastjson.util.TypeUtils;
import com.arangodb.entity.BaseEdgeDocument;
import com.zdjizhi.etl.LogService;
import com.zdjizhi.etl.dns.SketchTimeMapFunction;
import com.zdjizhi.utils.arangodb.AGSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
@@ -36,16 +36,21 @@ public class ConnLogService {
//写入ck通联relation表
LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
} else {
LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION);
LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH);
LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION);
}
DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) {
//合并通联和通联sketch
DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
//写入arangodb
LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
//合并通联和通联sketch
DataStream<BaseEdgeDocument> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
//写入arangodb
ConnLogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
}
}
@@ -59,21 +64,24 @@ public class ConnLogService {
String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time";
SingleOutputStreamOperator<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
DataStream<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
.setParallelism(SOURCE_PARALLELISM)
.filter(x -> {
if (Objects.isNull(x) || Convert.toLong(x.get(timeFilter)) <= 0) {
return false;
}
if (SOURCE_KAFKA_TOPIC_CONNECTION.equals(source)) {
if (String.valueOf(x.get("total_cs_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH ||
String.valueOf(x.get("total_cs_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) {
if (TypeUtils.castToLong(x.get("total_cs_pkts")) < 0 || TypeUtils.castToLong(x.get("total_cs_pkts")) == Long.MAX_VALUE
|| TypeUtils.castToLong(x.get("total_sc_pkts")) < 0 || TypeUtils.castToLong(x.get("total_sc_pkts")) == Long.MAX_VALUE
|| TypeUtils.castToLong(x.get("total_cs_bytes")) < 0 || TypeUtils.castToLong(x.get("total_cs_bytes")) == Long.MAX_VALUE
|| TypeUtils.castToLong(x.get("total_sc_bytes")) < 0 || TypeUtils.castToLong(x.get("total_sc_bytes")) == Long.MAX_VALUE) {
return false;
}
return true;
} else if (SOURCE_KAFKA_TOPIC_SKETCH.equals(source)) {
if (String.valueOf(x.get("sketch_sessions")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("sketch_packets")).length() >= AGGREGATE_MAX_VALUE_LENGTH ||
String.valueOf(x.get("sketch_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) {
if (TypeUtils.castToLong(x.get("sketch_sessions")) < 0 || TypeUtils.castToLong(x.get("sketch_sessions")) == Long.MAX_VALUE
|| TypeUtils.castToLong(x.get("sketch_packets")) < 0 || TypeUtils.castToLong(x.get("sketch_packets")) == Long.MAX_VALUE
|| TypeUtils.castToLong(x.get("sketch_bytes")) < 0 || TypeUtils.castToLong(x.get("sketch_bytes")) == Long.MAX_VALUE) {
return false;
}
return true;
@@ -96,8 +104,10 @@ public class ConnLogService {
}))
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new IpKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new ConnProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0)
.setParallelism(TRANSFORM_PARALLELISM);
return connTransformStream;
}
@@ -107,18 +117,28 @@ public class ConnLogService {
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new SketchProcessFunction());
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new SketchProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0)
.setParallelism(TRANSFORM_PARALLELISM);
return sketchTransformStream;
}
private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
private static DataStream<BaseEdgeDocument> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
DataStream<BaseEdgeDocument> ip2ipGraph = connTransformStream.union(sketchTransformStream)
.keyBy(new IpKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM);
return ip2ipGraph;
}
public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception {
sourceStream.addSink(new AGSink(sink))
.setParallelism(SINK_PARALLELISM)
.name(sink)
.setParallelism(SINK_PARALLELISM);
}
}

View File

@@ -1,162 +0,0 @@
//package com.zdjizhi.etl.connection;
//
//import cn.hutool.core.convert.Convert;
//import cn.hutool.core.util.StrUtil;
//import com.zdjizhi.etl.LogService;
//import com.zdjizhi.etl.dns.SketchTimeMapFunction;
//import com.zdjizhi.utils.kafka.KafkaConsumer;
//import org.apache.flink.api.common.eventtime.WatermarkStrategy;
//import org.apache.flink.api.common.functions.MapFunction;
//import org.apache.flink.api.java.utils.ParameterTool;
//import org.apache.flink.streaming.api.datastream.DataStream;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
//import org.apache.flink.streaming.api.windowing.time.Time;
//import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
//
//import java.time.Duration;
//import java.util.*;
//
//import static com.zdjizhi.common.FlowWriteConfig.*;
//
//
//public class ConnLogService2 {
//
// public static String convertToCsv(Map<String, Object> map) {
// List<Object> list = new ArrayList<>();
// list.add("(");
// for (Map.Entry<String, Object> m : map.entrySet()) {
// if (m.getValue() instanceof String) {
// list.add("'" + m.getValue() + "'");
// } else {
// list.add(m.getValue());
// }
// }
// list.add(")");
// String join = StrUtil.join(",", list);
// return join;
//
// }
//
// public static void connLogStream(StreamExecutionEnvironment env) throws Exception {
// //connection
// DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
// //sketch
// DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
//
// //写入CKsink,批量处理
// LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION);
//
// Map<String, String> globalParameters = new HashMap<>();
// // ClickHouse cluster properties
// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://bigdata-85:8123/");
// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, CK_USERNAME);
// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, CK_PIN);
//
// // sink common
// globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1");
// globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");
// globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2");
// globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2");
// globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2");
// globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");
//
// // set global paramaters
// ParameterTool parameters = ParameterTool.fromMap(globalParameters);
// env.getConfig().setGlobalJobParameters(parameters);
//
// // Transform 操作
// DataStream<String> dataStream = sketchSource.map(new MapFunction<Map<String, Object>, String>() {
// @Override
// public String map(Map<String, Object> data) throws Exception {
// String s = convertToCsv(data);
// System.err.println(s);
// return convertToCsv(data);
// }
// });
//
//
// // create props for sink
// Properties props = new Properties();
// props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, CK_DATABASE + "." + SINK_CK_TABLE_SKETCH);
// props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, CK_BATCH);
// ClickHouseSink sink = new ClickHouseSink(props);
// dataStream.addSink(sink);
// dataStream.print();
//
//// LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
//
// //transform
// DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource);
//
// //写入ck通联relation表
// LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
//
// DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
//
// //合并通联和通联sketch
// DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
//
// //写入arangodb
// LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
//
// }
//
// /**
// * 通联原始日志数据源消费kafka
// *
// * @param source
// * @return
// */
// private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {
//
// String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time";
//
// DataStream<Map<String, Object>> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
// .setParallelism(SOURCE_PARALLELISM)
// .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0)
// .setParallelism(SOURCE_PARALLELISM)
// .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction())
// .setParallelism(SOURCE_PARALLELISM)
// .name(source)
// .setParallelism(SOURCE_PARALLELISM);
// return sourceStream;
// }
//
// private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception {
// DataStream<Map<String, Object>> connTransformStream = connSource
// .assignTimestampsAndWatermarks(WatermarkStrategy
// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
// .withTimestampAssigner((event, timestamp) -> {
// return Convert.toLong(event.get("conn_start_time")) * 1000;
// }))
// .setParallelism(TRANSFORM_PARALLELISM)
// .keyBy(new IpKeysSelector())
// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
// .process(new ConnProcessFunction())
// .setParallelism(TRANSFORM_PARALLELISM);
// return connTransformStream;
// }
//
// private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception {
// DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
// .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
// .keyBy(new IpKeysSelector())
// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
// .process(new SketchProcessFunction());
// return sketchTransformStream;
// }
//
// private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
// DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
// .keyBy(new IpKeysSelector())
// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
// .process(new Ip2IpGraphProcessFunction())
// .setParallelism(TRANSFORM_PARALLELISM);
// return ip2ipGraph;
// }
//
//}

View File

@@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -29,7 +29,7 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
try {
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
if (values != null) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new LinkedHashMap<>();
result.put("start_time", values.f0);
result.put("end_time", values.f1);
result.put("src_ip", keys.f0);
@@ -53,16 +53,14 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
long endTime = DateUtil.currentSeconds();
try {
for (Map<String, Object> newSketchLog : elements) {
long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time"));
if (connStartTimetime > 0) {
long connStartTime = Convert.toLong(newSketchLog.get("conn_start_time"));
if (connStartTime > 0) {
sessions++;
packets = packets + TypeUtils.castToLong(newSketchLog.get("total_cs_pkts")) + TypeUtils.castToLong(newSketchLog.get("total_sc_pkts"));
bytes = bytes + TypeUtils.castToLong(newSketchLog.get("total_cs_bytes")) + TypeUtils.castToLong(newSketchLog.get("total_sc_bytes"));
startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
packets = packets > Long.MAX_VALUE ? 0 : packets;
bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
startTime = Math.min(connStartTime, startTime);
endTime = Math.max(connStartTime, endTime);
}
}
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
@@ -71,4 +69,5 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
}
return null;
}
}

View File

@@ -1,80 +0,0 @@
package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.util.TypeUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple5;
import java.util.Map;
/**
* @author 94976
*/
public class ConnReduceFunction implements ReduceFunction<Map<String, Object>> {
private static final Log logger = LogFactory.get();
//
// public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
// try {
// Tuple5<Long, Long, Long, Long, Long> values = sum(elements);
// if (values != null) {
// Map<String, Object> result = new HashMap<>();
// result.put("start_time", values.f0);
// result.put("end_time", values.f1);
// result.put("src_ip", keys.f0);
// result.put("dst_ip", keys.f1);
// result.put("sessions", values.f2);
// result.put("packets", values.f3);
// result.put("bytes", values.f4);
// out.collect(result);
// logger.debug("获取中间聚合结果:{}", result.toString());
// }
// } catch (Exception e) {
// logger.error("获取中间聚合结果失败,middleResult: {}", e);
// }
// }
//
private Tuple5<Long, Long, Long, Long, Long> sum(Map<String, Object> map1, Map<String, Object> map2) {
try {
long sessions = 0L;
long packets = 0L;
long bytes = 0L;
long startTime = DateUtil.currentSeconds();
long endTime = DateUtil.currentSeconds();
long connStartTime1 = Convert.toLong(map1.get("conn_start_time"));
long connStartTime2 = Convert.toLong(map2.get("conn_start_time"));
if (connStartTime1 > 0 && connStartTime2 > 0) {
sessions++;
packets = TypeUtils.castToLong(map1.get("total_cs_pkts")) + TypeUtils.castToLong(map1.get("total_sc_pkts")) +
TypeUtils.castToLong(map2.get("total_cs_pkts")) + TypeUtils.castToLong(map2.get("total_sc_pkts"));
bytes = bytes + TypeUtils.castToLong(map1.get("total_cs_bytes")) + TypeUtils.castToLong(map1.get("total_sc_bytes")) +
TypeUtils.castToLong(map2.get("total_cs_bytes")) + TypeUtils.castToLong(map2.get("total_sc_bytes"));
startTime = connStartTime1 < connStartTime2 ? connStartTime1 : connStartTime2;
endTime = connStartTime2 < connStartTime1 ? connStartTime1 : connStartTime2;
packets = packets > Long.MAX_VALUE ? 0 : packets;
bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
}
} catch (Exception e) {
logger.error("聚合中间结果集失败 {}", e);
}
return null;
}
@Override
public Map<String, Object> reduce(Map<String, Object> map1, Map<String, Object> map2) throws Exception {
return null;
}
}

View File

@@ -1,6 +1,7 @@
package com.zdjizhi.etl.connection;
import com.zdjizhi.utils.json.TypeUtils;
import com.alibaba.fastjson.util.TypeUtils;
import com.zdjizhi.utils.json.TypeUtil;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
@@ -9,8 +10,13 @@ public class ConnTimeMapFunction implements MapFunction<Map<String, Object>, Map
@Override
public Map<String, Object> map(Map<String, Object> value) throws Exception {
value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time")));
value.put("log_gen_time", TypeUtils.coverMSToS(value.get("log_gen_time")));
value.put("conn_start_time", TypeUtil.coverMSToS(value.get("conn_start_time")));
value.put("log_gen_time", TypeUtil.coverMSToS(value.get("log_gen_time")));
value.put("total_cs_pkts", TypeUtils.castToLong(value.get("total_cs_pkts")));
value.put("total_sc_pkts", TypeUtils.castToLong(value.get("total_sc_pkts")));
value.put("total_cs_bytes", TypeUtils.castToLong(value.get("total_cs_bytes")));
value.put("total_sc_bytes", TypeUtils.castToLong(value.get("total_sc_bytes")));
return value;
}
}

View File

@@ -2,42 +2,48 @@ package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.HashUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.arangodb.entity.BaseEdgeDocument;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
/**
* 对ip去重
* 处理时间,转为图数据
*/
public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple2<String, String>, TimeWindow> {
public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, BaseEdgeDocument, Tuple2<String, String>, TimeWindow> {
private static final Log logger = LogFactory.get();
@Override
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<BaseEdgeDocument> out) {
try {
long lastFoundTime = DateUtil.currentSeconds();;
long lastFoundTime = DateUtil.currentSeconds();
for (Map<String, Object> log : elements) {
long connStartTime = Convert.toLong(log.get("start_time"));
lastFoundTime = connStartTime > lastFoundTime ? connStartTime : lastFoundTime;
lastFoundTime = Math.max(connStartTime, lastFoundTime);
}
Map<String, Object> newLog = new HashMap<>();
newLog.put("src_ip", keys.f0);
newLog.put("dst_ip", keys.f1);
newLog.put("last_found_time", lastFoundTime);
out.collect(newLog);
logger.debug("获取中间聚合结果:{}", newLog.toString());
BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(keys.f0 + keys.f1)));
baseEdgeDocument.setFrom("src_ip/" + keys.f0);
baseEdgeDocument.setTo("dst_ip/" + keys.f1);
baseEdgeDocument.addAttribute("src_ip", keys.f0);
baseEdgeDocument.addAttribute("dst_ip", keys.f1);
baseEdgeDocument.addAttribute("last_found_time", lastFoundTime);
out.collect(baseEdgeDocument);
logger.debug("获取中间聚合结果:{}", baseEdgeDocument.toString());
} catch (Exception e) {
logger.error("获取中间聚合结果失败,middleResult: {}", e);
logger.error("获取中间聚合结果失败,middleResult: {}", e.getMessage());
}
}

View File

@@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -42,7 +42,7 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
try {
if (values != null) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new LinkedHashMap<>();
result.put("start_time", values.f0);
result.put("end_time", values.f1);
result.put("src_ip", keys.f0);
@@ -72,12 +72,9 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions"));
packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets"));
bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes"));
startTime = connStartTime < startTime ? connStartTime : startTime;
endTime = connStartTime > endTime ? connStartTime : endTime;
sessions = sessions > Long.MAX_VALUE ? 0 : sessions;
packets = packets > Long.MAX_VALUE ? 0 : packets;
bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
startTime = Math.min(connStartTime, startTime);
endTime = Math.max(connStartTime, endTime);
}
}
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);

View File

@@ -0,0 +1,21 @@
package com.zdjizhi.etl.connection;
import com.alibaba.fastjson.util.TypeUtils;
import com.zdjizhi.utils.json.TypeUtil;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
@Override
public Map<String, Object> map(Map<String, Object> value) throws Exception {
value.put("sketch_start_time", TypeUtil.coverMSToS(value.get("sketch_start_time")));
value.put("sketch_sessions", TypeUtils.castToLong(value.get("sketch_sessions")));
value.put("sketch_packets", TypeUtils.castToLong(value.get("sketch_packets")));
value.put("sketch_bytes", TypeUtils.castToLong(value.get("sketch_bytes")));
return value;
}
}

View File

@@ -1,36 +0,0 @@
package com.zdjizhi.etl.dns;
import cn.hutool.core.util.StrUtil;
import com.arangodb.entity.BaseEdgeDocument;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class ArangodbBatchDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
@Override
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception {
Iterator<Map<String, Object>> iterator = iterable.iterator();
List<BaseEdgeDocument> batchLog = new ArrayList<>();
while (iterator.hasNext()) {
Map<String, Object> next = iterator.next();
String qname = StrUtil.toString(next.get("qname"));
String record = StrUtil.toString(next.get("record"));
BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
baseEdgeDocument.setKey(String.join("-", qname, record));
baseEdgeDocument.setFrom("qname/" + qname);
baseEdgeDocument.setTo("record/" + record);
baseEdgeDocument.addAttribute("qname", qname);
baseEdgeDocument.addAttribute("record", record);
baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
batchLog.add(baseEdgeDocument);
}
out.collect(batchLog);
}
}

View File

@@ -0,0 +1,28 @@
package com.zdjizhi.etl.dns;
import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.StrUtil;
import com.arangodb.entity.BaseEdgeDocument;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
public class DnsGraphMapFunction implements MapFunction<Map<String, Object>, BaseEdgeDocument> {
@Override
public BaseEdgeDocument map(Map<String, Object> value) throws Exception {
String qname = StrUtil.toString(value.get("qname"));
String record = StrUtil.toString(value.get("record"));
BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(qname + record)));
baseEdgeDocument.setFrom("qname/" + qname);
baseEdgeDocument.setTo("record/" + record);
baseEdgeDocument.addAttribute("qname", qname);
baseEdgeDocument.addAttribute("record", record);
baseEdgeDocument.addAttribute("last_found_time", value.get("last_found_time"));
return baseEdgeDocument;
}
}

View File

@@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -28,7 +28,7 @@ public class DnsGraphProcessFunction extends ProcessWindowFunction<Map<String, O
Long startTime = Convert.toLong(log.get("start_time"));
tmpTime = startTime > tmpTime ? startTime : tmpTime;
}
Map newLog = new HashMap<>();
Map newLog = new LinkedHashMap<>();
newLog.put("record_type", keys.f0);
newLog.put("qname", keys.f1);
newLog.put("record", keys.f2);

View File

@@ -2,13 +2,14 @@ package com.zdjizhi.etl.dns;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import com.arangodb.entity.BaseEdgeDocument;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.etl.LogService;
import com.zdjizhi.utils.arangodb.AGSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -32,20 +33,24 @@ public class DnsLogService {
//dns 拆分后relation日志 ck入库
LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS);
} else {
LogService.getLogCKSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
LogService.getLogKafkaSink(dnsSource, SINK_CK_TABLE_DNS);
LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
}
//arango 入库,按record_type分组入不同的表
DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull)
DataStream<Map<String, Object>> dnsGraph = dnsTransform
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new DnsGraphProcessFunction())
.setParallelism(SINK_PARALLELISM);
for (DnsType dnsEnum : DnsType.values()) {
DataStream<Map<String, Object>> dnsRecordData = dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
DataStream<BaseEdgeDocument> dnsRecordData = dnsGraph
.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
.setParallelism(SINK_PARALLELISM)
.map(new DnsGraphMapFunction())
.setParallelism(SINK_PARALLELISM);
LogService.getLogArangoSink(dnsRecordData, dnsEnum.getSink());
getLogArangoSink(dnsRecordData, dnsEnum.getSink());
}
}
@@ -64,6 +69,7 @@ public class DnsLogService {
private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception {
DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
.setParallelism(SOURCE_PARALLELISM)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
@@ -71,10 +77,17 @@ public class DnsLogService {
.flatMap(new DnsSplitFlatMapFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new DnsGraphKeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM);
return dnsTransform;
}
public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception {
sourceStream.addSink(new AGSink(sink))
.setParallelism(SINK_PARALLELISM)
.name(sink)
.setParallelism(SINK_PARALLELISM);
}
}

View File

@@ -6,7 +6,7 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.google.common.base.Joiner;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.utils.json.TypeUtils;
import com.zdjizhi.utils.json.TypeUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
@@ -25,7 +25,7 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
@Override
public Map<String, Object> map(Map<String, Object> rawLog) throws Exception {
try {
rawLog.put("capture_time", TypeUtils.coverMSToS(rawLog.get("capture_time")));
rawLog.put("capture_time", TypeUtil.coverMSToS(rawLog.get("capture_time")));
//qname record 转小写
rawLog.put("qname", StringUtils.lowerCase(StrUtil.toString(rawLog.get("qname"))));
if (Objects.nonNull(rawLog.get("response"))) {

View File

@@ -9,7 +9,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION;
@@ -45,7 +45,7 @@ public class DnsRelationProcessFunction extends ProcessWindowFunction<Map<String
endTime = logStartTime > endTime ? logStartTime : endTime;
}
}
Map<String, Object> newDns = new HashMap<>();
Map<String, Object> newDns = new LinkedHashMap<>();
newDns.put("start_time", startTime);
newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION);
newDns.put("record_type", keys.f0);

View File

@@ -1,15 +0,0 @@
package com.zdjizhi.etl.dns;
import com.zdjizhi.utils.json.TypeUtils;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
@Override
public Map<String, Object> map(Map<String, Object> value) throws Exception {
value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time")));
return value;
}
}

View File

@@ -0,0 +1,137 @@
package com.zdjizhi.utils.arangodb;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.arangodb.entity.BaseEdgeDocument;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.*;
import static com.zdjizhi.common.FlowWriteConfig.ARANGODB_BATCH;
import static com.zdjizhi.common.FlowWriteConfig.SINK_ARANGODB_BATCH_DELAY_TIME;
public class AGSink extends RichSinkFunction<BaseEdgeDocument> {
private static final Log log = LogFactory.get();
private static final long serialVersionUID = 1L;
private static final Log logger = LogFactory.get();
// ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
private final List<BaseEdgeDocument> ipWithDataList;
// 满足此时间条件写出数据
private final long insertArangoTimeInterval = SINK_ARANGODB_BATCH_DELAY_TIME;
// 插入的批次
private final int insertArangoBatchSize = ARANGODB_BATCH; // 开发测试用10条
private static ArangoDBConnect arangoDBConnect;
private transient volatile boolean closed = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
// 数据表名
private String sink;
public AGSink(String sink) {
this.sink = sink;
this.ipWithDataList = new CopyOnWriteArrayList<>();
}
public String getSink() {
return sink;
}
/**
* Connects to the target database and initializes the prepared statement.
*/
@Override
public void open(Configuration parameters) throws Exception {
arangoDBConnect = ArangoDBConnect.getInstance();
if (insertArangoTimeInterval != 0 && insertArangoBatchSize != 1) {
this.scheduler = Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("arangodb-upsert-output-format"));
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (AGSink.this) {
if (!closed) {
try {
logger.debug("arangodb_flush.............");
flush(ipWithDataList);
} catch (Exception e) {
log.error(e);
}
}
}
},
insertArangoTimeInterval,
insertArangoTimeInterval,
TimeUnit.MILLISECONDS);
}
}
@Override
public final synchronized void invoke(BaseEdgeDocument row, Context context) throws IOException {
ipWithDataList.add(row);
if (ipWithDataList.size() >= this.insertArangoBatchSize) {
try {
flush(ipWithDataList);
} catch (SQLException e) {
logger.error("ck sink invoke flush failed.", e);
}
}
}
// 插入数据
private synchronized void flush(List<BaseEdgeDocument> data) throws SQLException {
if (data.size() > 0) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.debug("开始写入arangodb数据 {}", data.size());
arangoDBConnect.overwrite(data, sink);
stopWatch.stop();
log.debug("总共花费时间 {} ms", stopWatch.getTime());
log.debug("写入arangodb表{},数据 {}", sink, data.size());
ipWithDataList.clear();
}
}
/**
* Executes prepared statement and closes all resources of this instance.
*
* @throws IOException Thrown, if the input could not be closed properly.
*/
@Override
public synchronized void close() throws IOException {
if (!closed) {
closed = true;
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
if (arangoDBConnect != null) {
try {
flush(ipWithDataList);
} catch (SQLException e) {
log.error("JDBC statement could not be closed: " + e.getMessage());
} finally {
try {
arangoDBConnect.clean();
} catch (Exception e) {
log.error("JDBC connection could not be closed: " + e.getMessage());
}
}
}
}
}
}

View File

@@ -1,23 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zdjizhi.utils.ck;
import cn.hutool.core.io.IoUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.enums.LogMetadata;
@@ -25,10 +8,10 @@ import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import ru.yandex.clickhouse.ClickHousePreparedStatement;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
@@ -44,20 +27,6 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
private static final long serialVersionUID = 1L;
private static final Log logger = LogFactory.get();
private Connection connection;
private ClickHousePreparedStatement preparedStatement = null;
// ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
private final List<Map> ipWithDataList;
// 满足此时间条件写出数据
private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L
// 插入的批次
private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条
private transient volatile boolean closed = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private static final Map<String, String[]> logMetadataFields = new HashMap<>();
private static final Map<String, String> logMetadataSql = new HashMap<>();
@@ -68,6 +37,17 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
}
}
// ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
private final CopyOnWriteArrayList<Map> ipWithDataList;
// 满足此时间条件写出数据
private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L
// 插入的批次
private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条
private Connection connection;
private PreparedStatement preparedStatement = null;
private transient volatile boolean closed = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
// 数据表名
private String sink;
@@ -88,19 +68,19 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
connection = CKUtils.getConnection();
String sql = logMetadataSql.get(sink);
log.debug(sql);
preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql);
preparedStatement = connection.prepareStatement(sql);
if (insertCkTimeInterval != 0 && insertCkBatchSize != 1) {
this.scheduler = Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
1, new ExecutorThreadFactory("ck-upsert-output-format"));
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (CKSink.this) {
if (!closed) {
try {
logger.info("ck_flush.............");
flushClose();
logger.debug("ck_flush.............");
flush(ipWithDataList);
} catch (Exception e) {
log.error(e);
}
@@ -115,7 +95,7 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
}
@Override
public final synchronized void invoke(Map<String, Object> row, Context context) throws IOException {
public synchronized void invoke(Map<String, Object> row, Context context) throws IOException {
ipWithDataList.add(row);
/**
* 1. 将数据写入CK
@@ -123,9 +103,10 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
if (ipWithDataList.size() >= this.insertCkBatchSize) {
try {
flush(ipWithDataList);
logger.info("insertCkBatchSize");
log.debug("ck sink invoke flush ");
} catch (SQLException e) {
throw new RuntimeException("Preparation of JDBC statement failed.", e);
logger.error("ck sink invoke flush failed.", e);
}
}
@@ -134,35 +115,29 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
// 插入数据
private synchronized void flush(List<Map> data) throws SQLException {
if (data.size() > 0) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.info("开始写入ck数据 {}", data.size());
log.debug("ck sink flush "+data.size());
try {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.debug("开始写入ck数据 {}", data.size());
String[] logFields = logMetadataFields.get(sink);
connection.setAutoCommit(false);
for (Map<String, Object> map : data) {
String[] logFields = logMetadataFields.get(sink);
for (Map<String, Object> map : data) {
for (int i = 0; i < logFields.length; i++) {
preparedStatement.setObject(i + 1, map.get(logFields[i]));
for (int i = 0; i < logFields.length; i++) {
preparedStatement.setObject(i + 1, map.get(logFields[i]));
}
preparedStatement.addBatch();
}
preparedStatement.addBatch();
preparedStatement.executeBatch();
preparedStatement.clearBatch();
stopWatch.stop();
log.debug("总共花费时间 {} ms", stopWatch.getTime());
log.debug("写入ck表{},数据 {}", sink, data.size());
ipWithDataList.clear();
} catch (SQLException e) {
logger.error(e);
}
preparedStatement.executeBatch();
connection.commit();
preparedStatement.clearBatch();
stopWatch.stop();
log.info("总共花费时间 {} ms", stopWatch.getTime());
log.info("写入ck表{},数据 {}", sink, data.size());
ipWithDataList.clear();
}
}
private synchronized void flushClose() {
try {
flush(ipWithDataList);
} catch (SQLException e) {
log.error("Preparation of JDBC statement failed.", e);
}
}
@@ -179,23 +154,22 @@ public class CKSink extends RichSinkFunction<Map<String, Object>> {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
if (preparedStatement != null) {
flushClose();
try {
preparedStatement.close();
log.debug("ck sink close flush ");
flush(ipWithDataList);
} catch (SQLException e) {
log.error("JDBC statement could not be closed: " + e.getMessage());
} finally {
preparedStatement = null;
try {
IoUtil.close(preparedStatement);
CKUtils.close(connection);
} catch (Exception e) {
log.error("JDBC connection could not be closed: " + e.getMessage());
}
}
}
try {
CKUtils.close(connection);
} catch (Exception e) {
log.error("JDBC connection could not be closed: " + e.getMessage());
}
}
}
}

View File

@@ -3,6 +3,7 @@ package com.zdjizhi.utils.ck;
import cn.hutool.core.io.IoUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zaxxer.hikari.HikariDataSource;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
@@ -16,10 +17,8 @@ public class CKUtils {
private static final Log log = LogFactory.get();
private static Connection connection;
public static Connection getConnection() {
Connection connection = null;
try {
ClickHouseProperties props = new ClickHouseProperties();
props.setDatabase(CK_DATABASE);
@@ -27,19 +26,20 @@ public class CKUtils {
props.setPassword(CK_PIN);
props.setConnectionTimeout(CK_CONNECTION_TIMEOUT);
props.setSocketTimeout(CK_SOCKET_TIMEOUT);
props.setMaxThreads(50);
props.setMaxThreads(CK_MAX_THREADS);
BalancedClickhouseDataSource blDataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, props);
blDataSource.actualize();
blDataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测
blDataSource.scheduleActualization(CK_SCHEDULE_ACTUALIZATION, TimeUnit.SECONDS);//开启检测
// HikariConfig conf = new HikariConfig();
// conf.setDataSource(blDataSource);
// conf.setMinimumIdle(1);
// conf.setMaximumPoolSize(20);
//
// HikariDataSource hkDs = new HikariDataSource(conf);
connection = blDataSource.getConnection();
HikariDataSource hkDs = new HikariDataSource();
hkDs.setDataSource(blDataSource);
hkDs.setMinimumIdle(HIKARI_MINIMUM_IDLE);
hkDs.setMaximumPoolSize(HIKARI_MAXIMUM_POOL_SIZE);
hkDs.setMaxLifetime(HIKARI_MAX_LIFETIME);
hkDs.setIdleTimeout(HIKARI_IDLE_TIMEOUT);
connection = hkDs.getConnection();
log.debug("get clickhouse connection success");
} catch (SQLException e) {
log.error("clickhouse connection error ,{}", e);

View File

@@ -1,208 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zdjizhi.utils.ck;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.enums.LogMetadata;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHousePreparedStatement;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH;
import static com.zdjizhi.common.FlowWriteConfig.SINK_CK_BATCH_DELAY_TIME;
public class ClickHouseJDBCOutput extends RichSinkFunction<Map<String, Object>> {
private static final Log log = LogFactory.get();
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(ClickHouseJDBCOutput.class);
private Connection connection;
private ClickHousePreparedStatement preparedStatement = null;
// ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
private final List<Map> ipWithDataList;
// 满足此时间条件写出数据
private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L
// 插入的批次
private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条
private transient volatile boolean closed = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private static final Map<String, String[]> logMetadataFields = new HashMap<>();
private static final Map<String, String> logMetadataSql = new HashMap<>();
static {
for (LogMetadata value : LogMetadata.values()) {
logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink()));
logMetadataFields.put(value.getSink(), value.getFields());
}
}
// 数据表名
private String sink;
public ClickHouseJDBCOutput(String sink) {
this.sink = sink;
this.ipWithDataList = new CopyOnWriteArrayList<>();
}
public String getSink() {
return sink;
}
/**
* Connects to the target database and initializes the prepared statement.
*/
@Override
public void open(Configuration parameters) throws Exception {
connection = CKUtils.getConnection();
String sql = logMetadataSql.get(sink);
log.debug(sql);
preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql);
if (insertCkTimeInterval != 0 && insertCkBatchSize != 1) {
this.scheduler = Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (ClickHouseJDBCOutput.this) {
if (!closed) {
try {
logger.info("ck_flush.............");
flushClose();
} catch (Exception e) {
// flushException = e;
log.error(e);
}
}
}
},
insertCkTimeInterval,
insertCkTimeInterval,
TimeUnit.MILLISECONDS);
}
}
@Override
public final synchronized void invoke(Map<String, Object> row, Context context) throws IOException {
ipWithDataList.add(row);
/**
* 1. 将数据写入CK
*/
if (ipWithDataList.size() >= this.insertCkBatchSize) {
try {
flush(ipWithDataList);
logger.info("insertCkBatchSize");
} catch (SQLException e) {
throw new RuntimeException("Preparation of JDBC statement failed.", e);
}
}
}
// 插入数据
private synchronized void flush(List<Map> data) throws SQLException {
if (data.size() > 0) {
// checkFlushException();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.info("开始写入ck数据 {}", data.size());
connection.setAutoCommit(false);
String[] logFields = logMetadataFields.get(sink);
for (Map<String, Object> map : data) {
for (int i = 0; i < logFields.length; i++) {
preparedStatement.setObject(i + 1, map.get(logFields[i]));
}
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
connection.commit();
preparedStatement.clearBatch();
stopWatch.stop();
log.info("总共花费时间 {} ms", stopWatch.getTime());
log.info("写入ck表{},数据 {}", sink, data.size());
ipWithDataList.clear();
}
}
private synchronized void flushClose() {
try {
flush(ipWithDataList);
} catch (SQLException e) {
log.error("Preparation of JDBC statement failed.", e);
}
}
/**
* Executes prepared statement and closes all resources of this instance.
*
* @throws IOException Thrown, if the input could not be closed properly.
*/
@Override
public synchronized void close() throws IOException {
if (!closed) {
closed = true;
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
if (preparedStatement != null) {
flushClose();
try {
preparedStatement.close();
} catch (SQLException e) {
log.error("JDBC statement could not be closed: " + e.getMessage());
} finally {
preparedStatement = null;
}
}
try {
CKUtils.close(connection);
} catch (Exception e) {
log.error("JDBC connection could not be closed: " + e.getMessage());
}
}
}
}

View File

@@ -1,106 +0,0 @@
//package com.zdjizhi.utils.ck;
//
//import cn.hutool.core.io.IoUtil;
//import cn.hutool.log.Log;
//import cn.hutool.log.LogFactory;
//import com.zdjizhi.enums.LogMetadata;
//import org.apache.commons.lang3.time.StopWatch;
//import org.apache.flink.configuration.Configuration;
//import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
//import ru.yandex.clickhouse.ClickHousePreparedStatement;
//
//import java.sql.Connection;
//import java.util.HashMap;
//import java.util.List;
//import java.util.Map;
//
//import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH;
//
//public class ClickhouseSink2 extends RichSinkFunction<List<Map<String, Object>>> {
//
// private static final Log log = LogFactory.get();
//
// private Connection connection;
// private ClickHousePreparedStatement preparedStatement;
// public String sink;
//
// private static final Map<String, String[]> logMetadataFields = new HashMap<>();
// private static final Map<String, String> logMetadataSql = new HashMap<>();
//
// static {
// for (LogMetadata value : LogMetadata.values()) {
// logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink()));
// logMetadataFields.put(value.getSink(), value.getFields());
// }
// }
//
// public ClickhouseSink2(String sink) {
// this.sink = sink;
// }
//
// public String getSink() {
// return sink;
// }
//
// public void setSink(String sink) {
// this.sink = sink;
// }
//
// @Override
// public void invoke(List<Map<String, Object>> logs, Context context) throws Exception {
// executeInsert(logs, getSink());
// }
//
// @Override
// public void open(Configuration parameters) throws Exception {
// connection = CKUtils.getConnection();
// String sql = logMetadataSql.get(sink);
// log.debug(sql);
// connection.setAutoCommit(false);
// preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql);
// }
//
// @Override
// public void close() throws Exception {
// IoUtil.close(preparedStatement);
// CKUtils.close(connection);
// }
//
// public void executeInsert(List<Map<String, Object>> data, String tableName) {
//
// try {
// StopWatch stopWatch = new StopWatch();
// stopWatch.start();
// log.info("开始写入ck数据 {}", data.size());
//
// String[] logFields = logMetadataFields.get(tableName);
//
// int count = 0;
// for (Map<String, Object> map : data) {
// for (int i = 0; i < logFields.length; i++) {
// preparedStatement.setObject(i + 1, map.get(logFields[i]));
// }
// preparedStatement.addBatch();
// count++;
// if (count % CK_BATCH == 0) {
// preparedStatement.executeBatch();
// connection.commit();
// preparedStatement.clearBatch();
// count = 0;
// }
// }
// if (count > 0) {
// preparedStatement.executeBatch();
// connection.commit();
// preparedStatement.clearBatch();
// }
//
// stopWatch.stop();
// log.info("总共花费时间 {} ms", stopWatch.getTime());
// log.info("写入ck表{},数据 {}", tableName, data.size());
// } catch (Exception ex) {
// log.error("ClickhouseSink插入报错", ex);
// }
// }
//
//}

View File

@@ -79,7 +79,7 @@ public class JsonTypeUtil {
* @return Long value
*/
static long checkLongValue(Object value) {
Long longVal = TypeUtils.castToLong(value);
Long longVal = TypeUtil.castToLong(value);
if (longVal == null) {
return 0L;
@@ -99,7 +99,7 @@ public class JsonTypeUtil {
return null;
}
return TypeUtils.castToDouble(value);
return TypeUtil.castToDouble(value);
}
@@ -111,7 +111,7 @@ public class JsonTypeUtil {
*/
static int getIntValue(Object value) {
Integer intVal = TypeUtils.castToInt(value);
Integer intVal = TypeUtil.castToInt(value);
if (intVal == null) {
return 0;
}

View File

@@ -16,7 +16,7 @@ import java.util.concurrent.TimeUnit;
* @Description:
* @date 2021/7/1218:20
*/
public class TypeUtils {
public class TypeUtil {
private static final Log logger = LogFactory.get();
/**

View File

@@ -31,13 +31,11 @@ public class TimestampDeserializationSchema implements KafkaDeserializationSchem
@Override
@SuppressWarnings("unchecked")
public Map<String,Object> deserialize(ConsumerRecord record) throws Exception {
public Map<String, Object> deserialize(ConsumerRecord record) throws Exception {
if (record != null) {
try {
// long timestamp = record.timestamp() / 1000;
String value = new String((byte[]) record.value(), FlowWriteConfig.ENCODING);
Map<String, Object> data = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
// json.put("common_ingestion_time", timestamp);
return data;
} catch (RuntimeException e) {
logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());