增加Arango 入库相关操作
This commit is contained in:
@@ -80,5 +80,26 @@ ck.pin=galaxy2019
|
||||
#connection_record_log
|
||||
|
||||
flink.watermark.max.orderness=10
|
||||
#s
|
||||
log.aggregate.duration=30
|
||||
#统计时间间隔 单位s
|
||||
log.aggregate.duration=30
|
||||
log.aggregate.duration.graph=30
|
||||
|
||||
#arangoDB参数配置
|
||||
arangoDB.host=192.168.40.182
|
||||
#arangoDB.host=192.168.40.224
|
||||
arangoDB.port=8529
|
||||
arangoDB.user=upsert
|
||||
arangoDB.password=ceiec2018
|
||||
arangoDB.DB.name=ip-learning-test
|
||||
#arangoDB.DB.name=tsg_galaxy_v3
|
||||
arangoDB.batch=100000
|
||||
arangoDB.ttl=3600
|
||||
|
||||
arangoDB.read.limit=
|
||||
update.arango.batch=10000
|
||||
|
||||
thread.pool.number=10
|
||||
thread.await.termination.time=10
|
||||
|
||||
sink.batch.time.out=5
|
||||
sink.batch=10000
|
||||
@@ -2,6 +2,7 @@ package com.zdjizhi.common;
|
||||
|
||||
|
||||
import com.zdjizhi.utils.system.FlowWriteConfigurations;
|
||||
import org.apache.flink.configuration.ConfigUtils;
|
||||
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
|
||||
|
||||
/**
|
||||
@@ -108,24 +109,26 @@ public class FlowWriteConfig {
|
||||
/**
|
||||
* common config
|
||||
*/
|
||||
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers");
|
||||
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers");
|
||||
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers");
|
||||
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library");
|
||||
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers");
|
||||
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
|
||||
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
|
||||
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
|
||||
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
|
||||
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
|
||||
|
||||
|
||||
/*
|
||||
* ck
|
||||
* */
|
||||
public static final String CK_HOSTS = FlowWriteConfigurations.getStringProperty(0,"ck.hosts");
|
||||
public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0,"ck.username");
|
||||
public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0,"ck.pin");
|
||||
public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0,"ck.database");
|
||||
* ck
|
||||
* */
|
||||
public static final String CK_HOSTS = FlowWriteConfigurations.getStringProperty(0, "ck.hosts");
|
||||
public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0, "ck.username");
|
||||
public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0, "ck.pin");
|
||||
public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0, "ck.database");
|
||||
|
||||
public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0,"flink.watermark.max.orderness");
|
||||
public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0,"log.aggregate.duration");
|
||||
public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");;
|
||||
public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.orderness");
|
||||
public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration");
|
||||
public static final int LOG_AGGREGATE_DURATION_GRAPH = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration.graph");
|
||||
public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");
|
||||
;
|
||||
public static final String SOURCE_KAFKA_TOPIC_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.connection");
|
||||
public static final String SOURCE_KAFKA_TOPIC_SKETCH = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.sketch");
|
||||
|
||||
@@ -135,4 +138,20 @@ public class FlowWriteConfig {
|
||||
public static final String SINK_CK_TABLE_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.dns");
|
||||
public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection");
|
||||
public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns");
|
||||
|
||||
|
||||
public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host");
|
||||
public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port");
|
||||
public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user");
|
||||
public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangoDB.password");
|
||||
public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name");
|
||||
public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl");
|
||||
public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch");
|
||||
|
||||
public static final Integer UPDATE_ARANGO_BATCH = FlowWriteConfigurations.getIntProperty(0, "update.arango.batch");
|
||||
public static final String ARANGODB_READ_LIMIT = FlowWriteConfigurations.getStringProperty(0, "arangoDB.read.limit");
|
||||
public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number");
|
||||
public static final Integer THREAD_AWAIT_TERMINATION_TIME = FlowWriteConfigurations.getIntProperty(0, "thread.await.termination.time");
|
||||
public static final Integer SINK_BATCH_TIME_OUT = FlowWriteConfigurations.getIntProperty(0, "sink.batch.time.out");
|
||||
public static final Integer SINK_BATCH = FlowWriteConfigurations.getIntProperty(0, "sink.batch");
|
||||
}
|
||||
23
src/main/java/com/zdjizhi/common/IpKeysSelector.java
Normal file
23
src/main/java/com/zdjizhi/common/IpKeysSelector.java
Normal file
@@ -0,0 +1,23 @@
|
||||
package com.zdjizhi.common;
|
||||
|
||||
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @description:
|
||||
* @author: zhq
|
||||
* @create: 2022-07-05
|
||||
**/
|
||||
public class IpKeysSelector implements KeySelector<Map<String, Object>, Tuple2<String, String>> {
|
||||
|
||||
@Override
|
||||
public Tuple2<String, String> getKey(Map<String,Object> log) throws Exception {
|
||||
|
||||
return Tuple2.of(
|
||||
String.valueOf(log.get("src_ip")),
|
||||
String.valueOf(log.get("dst_ip")));
|
||||
}
|
||||
}
|
||||
65
src/main/java/com/zdjizhi/common/TopMetricProcessV2.java
Normal file
65
src/main/java/com/zdjizhi/common/TopMetricProcessV2.java
Normal file
@@ -0,0 +1,65 @@
|
||||
package com.zdjizhi.common;
|
||||
|
||||
import org.apache.flink.api.common.state.ListState;
|
||||
import org.apache.flink.api.common.state.ListStateDescriptor;
|
||||
import org.apache.flink.api.common.state.ValueState;
|
||||
import org.apache.flink.api.common.state.ValueStateDescriptor;
|
||||
import org.apache.flink.api.common.typeinfo.Types;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.TreeSet;
|
||||
|
||||
public class TopMetricProcessV2 extends ProcessFunction<Map<String,Object>, Collector<Map<String,Object>>> {
|
||||
|
||||
|
||||
private ValueState<Long> currentTimer;
|
||||
private ListState<Map<String,Object>> itemState;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>("_timer", Types.LONG));
|
||||
ListStateDescriptor<Map<String,Object>> itemViewStateDesc = new ListStateDescriptor("_state", Map.class);
|
||||
itemState = getRuntimeContext().getListState(itemViewStateDesc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processElement(Map<String,Object> value, Context context, Collector<Collector<Map<String,Object>>> collector) throws Exception {
|
||||
//判断定时器是否为空,为空则创建新的定时器
|
||||
Long curTimeStamp = currentTimer.value();
|
||||
if (curTimeStamp == null || curTimeStamp == 0) {
|
||||
long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000;
|
||||
context.timerService().registerEventTimeTimer(onTimer);
|
||||
currentTimer.update(onTimer);
|
||||
}
|
||||
itemState.add(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Collector<Map<String, Object>>> out) throws Exception {
|
||||
super.onTimer(timestamp, ctx, out);
|
||||
|
||||
Iterator<Map<String,Object>> iterator = itemState.get().iterator();
|
||||
if(iterator.hasNext()){
|
||||
out.collect((Collector<Map<String, Object>>) iterator.next());
|
||||
}
|
||||
// if (baseLogs.size() > FlowWriteConfig.SINK_BATCH) {
|
||||
// Map last = baseLogs.last();
|
||||
// if (Double.compare(map.get(orderBy).doubleValue(), last.get(orderBy).doubleValue()) > 0) {
|
||||
// baseLogs.pollLast();
|
||||
// baseLogs.add(map);
|
||||
// }
|
||||
// } else {
|
||||
// baseLogs.add(map);
|
||||
// }
|
||||
// }
|
||||
currentTimer.clear();
|
||||
itemState.clear();
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package com.zdjizhi.etl;
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple5;
|
||||
import org.apache.flink.api.java.tuple.Tuple6;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
@@ -5,8 +5,6 @@ import cn.hutool.core.util.StrUtil;
|
||||
import com.zdjizhi.enums.DnsType;
|
||||
import com.zdjizhi.pojo.DbLogEntity;
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -9,7 +9,6 @@ import org.apache.flink.util.Collector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
45
src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java
Normal file
45
src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java
Normal file
@@ -0,0 +1,45 @@
|
||||
package com.zdjizhi.etl;
|
||||
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* 对ip去重
|
||||
*/
|
||||
public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, BaseDocument, Tuple2<String, String>, TimeWindow> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(Ip2IpGraphProcessFunction.class);
|
||||
|
||||
@Override
|
||||
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<BaseDocument> out) {
|
||||
|
||||
try {
|
||||
long lastFoundTime = DateUtil.currentSeconds();
|
||||
for (Map<String, Object> log : elements) {
|
||||
long connStartTimetime = Convert.toLong(log.get("conn_start_time"));
|
||||
lastFoundTime = connStartTimetime > lastFoundTime ? connStartTimetime : lastFoundTime;
|
||||
}
|
||||
BaseDocument baseDocument = new BaseDocument();
|
||||
baseDocument.setKey(String.join("-", keys.f0, keys.f1));
|
||||
baseDocument.addAttribute("src_ip", keys.f0);
|
||||
baseDocument.addAttribute("dst_ip", keys.f1);
|
||||
baseDocument.addAttribute("last_found_time", lastFoundTime);
|
||||
out.collect(baseDocument);
|
||||
logger.debug("获取中间聚合结果:{}", baseDocument.toString());
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("获取中间聚合结果失败,middleResult: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,10 +1,8 @@
|
||||
package com.zdjizhi.etl;
|
||||
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import com.zdjizhi.enums.LogMetadata;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple5;
|
||||
import org.apache.flink.api.java.tuple.Tuple6;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
@@ -3,16 +3,20 @@ package com.zdjizhi.topology;
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
import com.zdjizhi.common.DnsKeysSelector;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.common.KeysSelector;
|
||||
import com.zdjizhi.common.IpKeysSelector;
|
||||
import com.zdjizhi.etl.ConnProcessFunction;
|
||||
import com.zdjizhi.etl.Ip2IpGraphProcessFunction;
|
||||
import com.zdjizhi.etl.DnsProcessFunction;
|
||||
import com.zdjizhi.etl.SketchProcessFunction;
|
||||
import com.zdjizhi.utils.arangodb.ArangoDBSink;
|
||||
import com.zdjizhi.utils.ck.ClickhouseSink;
|
||||
import com.zdjizhi.utils.kafka.KafkaConsumer;
|
||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
@@ -33,17 +37,17 @@ public class LogFlowWriteTopology {
|
||||
//两个输出之间的最大时间 (单位milliseconds)
|
||||
env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
|
||||
|
||||
//1connection,2dns
|
||||
//1 connection,2 dns
|
||||
if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
|
||||
//connection
|
||||
DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION))
|
||||
.filter(x -> Objects.nonNull(x))
|
||||
.filter(Objects::nonNull)
|
||||
.setParallelism(SOURCE_PARALLELISM)
|
||||
.name(SOURCE_KAFKA_TOPIC_CONNECTION);
|
||||
|
||||
//sketch
|
||||
DataStream<Map<String, Object>> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH))
|
||||
.filter(x -> Objects.nonNull(x))
|
||||
.filter(Objects::nonNull)
|
||||
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
|
||||
.name(SOURCE_KAFKA_TOPIC_SKETCH);
|
||||
|
||||
@@ -52,7 +56,7 @@ public class LogFlowWriteTopology {
|
||||
.assignTimestampsAndWatermarks(WatermarkStrategy
|
||||
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
|
||||
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
|
||||
.keyBy(new KeysSelector())
|
||||
.keyBy(new IpKeysSelector())
|
||||
.window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
|
||||
.process(new ConnProcessFunction())
|
||||
.filter(x -> Objects.nonNull(x))
|
||||
@@ -61,22 +65,34 @@ public class LogFlowWriteTopology {
|
||||
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
|
||||
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
|
||||
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
|
||||
.keyBy(new KeysSelector())
|
||||
.keyBy(new IpKeysSelector())
|
||||
.window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
|
||||
.process(new SketchProcessFunction())
|
||||
.filter(x -> Objects.nonNull(x))
|
||||
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
|
||||
|
||||
//写入CKsink
|
||||
//入Arangodb
|
||||
SingleOutputStreamOperator<BaseDocument> ip2ipGraph = connTransformStream.union(sketchTransformStream)
|
||||
.keyBy(new IpKeysSelector())
|
||||
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
|
||||
.process(new Ip2IpGraphProcessFunction())
|
||||
.filter(x -> Objects.nonNull(x))
|
||||
.setParallelism(TRANSFORM_PARALLELISM);
|
||||
|
||||
//写入CKsink,批量处理
|
||||
connSource.addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
|
||||
sketchSource.addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
|
||||
connTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
|
||||
sketchTransformStream.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
|
||||
|
||||
// ip2ipGraph.addSink(new ArangoDBSink("R_VISIT_IP2IP"));
|
||||
|
||||
|
||||
|
||||
} else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) {
|
||||
|
||||
DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS))
|
||||
.filter(x -> Objects.nonNull(x))
|
||||
.filter(Objects::nonNull)
|
||||
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
|
||||
.name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
|
||||
|
||||
@@ -90,14 +106,14 @@ public class LogFlowWriteTopology {
|
||||
.setParallelism(TRANSFORM_PARALLELISM);
|
||||
|
||||
//过滤空数据不发送到Kafka内
|
||||
dnsSource.filter(x -> Objects.nonNull(x))
|
||||
dnsSource.filter(Objects::nonNull)
|
||||
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
|
||||
.name("FilterOriginalData")
|
||||
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
|
||||
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
|
||||
.name("CKSink");
|
||||
|
||||
dnsTransform.filter(x -> Objects.nonNull(x))
|
||||
dnsTransform.filter(Objects::nonNull)
|
||||
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
|
||||
.name("FilterOriginalData")
|
||||
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
|
||||
|
||||
@@ -0,0 +1,95 @@
|
||||
package com.zdjizhi.utils.arangodb;
|
||||
|
||||
import com.arangodb.ArangoCollection;
|
||||
import com.arangodb.ArangoCursor;
|
||||
import com.arangodb.ArangoDB;
|
||||
import com.arangodb.ArangoDatabase;
|
||||
import com.arangodb.entity.DocumentCreateEntity;
|
||||
import com.arangodb.entity.ErrorEntity;
|
||||
import com.arangodb.entity.MultiDocumentEntity;
|
||||
import com.arangodb.model.AqlQueryOptions;
|
||||
import com.arangodb.model.DocumentCreateOptions;
|
||||
import com.arangodb.util.MapBuilder;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ArangoDBConnect {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class);
|
||||
private static ArangoDB arangoDB = null;
|
||||
private static ArangoDBConnect conn = null;
|
||||
static {
|
||||
getArangoDB();
|
||||
}
|
||||
|
||||
private static void getArangoDB(){
|
||||
arangoDB = new ArangoDB.Builder()
|
||||
.maxConnections(FlowWriteConfig.THREAD_POOL_NUMBER)
|
||||
.host(FlowWriteConfig.ARANGODB_HOST, FlowWriteConfig.ARANGODB_PORT)
|
||||
.user(FlowWriteConfig.ARANGODB_USER)
|
||||
.password(FlowWriteConfig.ARANGODB_PASSWORD)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static synchronized ArangoDBConnect getInstance(){
|
||||
if (null == conn){
|
||||
conn = new ArangoDBConnect();
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
private ArangoDatabase getDatabase(){
|
||||
return arangoDB.db(FlowWriteConfig.ARANGODB_DB_NAME);
|
||||
}
|
||||
|
||||
public void clean(){
|
||||
try {
|
||||
if (arangoDB != null){
|
||||
arangoDB.shutdown();
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public <T> ArangoCursor<T> executorQuery(String query,Class<T> type){
|
||||
ArangoDatabase database = getDatabase();
|
||||
Map<String, Object> bindVars = new MapBuilder().get();
|
||||
AqlQueryOptions options = new AqlQueryOptions()
|
||||
.ttl(FlowWriteConfig.ARANGODB_TTL);
|
||||
try {
|
||||
return database.query(query, bindVars, options, type);
|
||||
}catch (Exception e){
|
||||
LOG.error(e.getMessage());
|
||||
return null;
|
||||
}finally {
|
||||
bindVars.clear();
|
||||
}
|
||||
}
|
||||
|
||||
public <T> void overwrite(List<T> docOverwrite, String collectionName){
|
||||
ArangoDatabase database = getDatabase();
|
||||
try {
|
||||
ArangoCollection collection = database.collection(collectionName);
|
||||
if (!docOverwrite.isEmpty()){
|
||||
DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions();
|
||||
documentCreateOptions.overwrite(true);
|
||||
documentCreateOptions.silent(true);
|
||||
MultiDocumentEntity<DocumentCreateEntity<T>> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions);
|
||||
Collection<ErrorEntity> errors = documentCreateEntityMultiDocumentEntity.getErrors();
|
||||
for (ErrorEntity errorEntity:errors){
|
||||
LOG.error("写入arangoDB异常:"+errorEntity.getErrorMessage());
|
||||
}
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOG.error("更新失败:"+e.toString());
|
||||
}finally {
|
||||
docOverwrite.clear();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
47
src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java
Normal file
47
src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java
Normal file
@@ -0,0 +1,47 @@
|
||||
package com.zdjizhi.utils.arangodb;
|
||||
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @description:
|
||||
* @author: zhq
|
||||
* @create: 2022-07-07
|
||||
**/
|
||||
public class ArangoDBSink extends RichSinkFunction<List<BaseDocument>> {
|
||||
|
||||
private static ArangoDBConnect arangoDBConnect;
|
||||
private String collection;
|
||||
|
||||
@Override
|
||||
public void invoke(List<BaseDocument> baseDocuments, Context context) throws Exception {
|
||||
arangoDBConnect.overwrite(baseDocuments, getCollection());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
arangoDBConnect = ArangoDBConnect.getInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
super.close();
|
||||
arangoDBConnect.clean();
|
||||
}
|
||||
|
||||
public ArangoDBSink(String collection) {
|
||||
this.collection = collection;
|
||||
}
|
||||
|
||||
public String getCollection() {
|
||||
return collection;
|
||||
}
|
||||
|
||||
public void setCollection(String collection) {
|
||||
this.collection = collection;
|
||||
}
|
||||
}
|
||||
165
src/main/java/com/zdjizhi/utils/ck/CKSink.java
Normal file
165
src/main/java/com/zdjizhi/utils/ck/CKSink.java
Normal file
@@ -0,0 +1,165 @@
|
||||
package com.zdjizhi.utils.ck;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
|
||||
import ru.yandex.clickhouse.ClickHouseConnection;
|
||||
import ru.yandex.clickhouse.ClickHouseDataSource;
|
||||
import ru.yandex.clickhouse.settings.ClickHouseProperties;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class CKSink extends RichSinkFunction<Map<String, Object>> {
|
||||
|
||||
private static final Log log = LogFactory.get();
|
||||
|
||||
private static int count = 1;
|
||||
private static ClickHouseConnection connection = null;
|
||||
private static PreparedStatement preparedStatement = null;
|
||||
|
||||
static String database = "default";
|
||||
static String address = "jdbc:clickhouse://192.168.45.102:8123/"+database;
|
||||
static String username = "default";
|
||||
static String password = "galaxy2019";
|
||||
static String fieldStr = "id,name,age";
|
||||
static String tableName = "user_table";
|
||||
|
||||
private String insertSql;
|
||||
|
||||
//创建连接对象和会话
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
try {
|
||||
connection = getConn();
|
||||
log.info("get clickhouse connection success !");
|
||||
String insertSql = preparedSql(fieldStr, tableName);
|
||||
connection.setAutoCommit(false);
|
||||
preparedStatement = connection.prepareStatement(insertSql);
|
||||
} catch (Exception e) {
|
||||
log.error("clickhouse初始化连接报错:", e);
|
||||
}
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public void close() throws Exception {
|
||||
// super.close();
|
||||
// //关闭连接和释放资源
|
||||
// if (connection != null) {
|
||||
// connection.close();
|
||||
// }
|
||||
// if (preparedStatement != null) {
|
||||
// preparedStatement.close();
|
||||
// }
|
||||
// }
|
||||
|
||||
//使用Batch批量写入,关闭自动提交
|
||||
@Override
|
||||
public void invoke(Map<String, Object> data, Context context) {
|
||||
log.info(" invoke methed ");
|
||||
|
||||
try {
|
||||
|
||||
LinkedList<Object> values = new LinkedList<>(data.values());
|
||||
for (int i = 1; i <= values.size(); i++) {
|
||||
Object val = values.get(i - 1);
|
||||
if (val instanceof Long) {
|
||||
preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
|
||||
} else if (val instanceof Integer) {
|
||||
preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
|
||||
} else if (val instanceof Boolean) {
|
||||
preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val)));
|
||||
} else {
|
||||
preparedStatement.setString((i), StrUtil.toString(val));
|
||||
}
|
||||
}
|
||||
|
||||
preparedStatement.addBatch();
|
||||
count = count + 1;
|
||||
try {
|
||||
// if (count >= 50000) {
|
||||
// preparedStatement.executeBatch();
|
||||
// connection.commit();
|
||||
// preparedStatement.clearBatch();
|
||||
// count = 1;
|
||||
// }
|
||||
|
||||
//1w提交一次
|
||||
// if (count % 10000 == 0) {
|
||||
// preparedStatement.executeBatch();
|
||||
// connection.commit();
|
||||
// preparedStatement.clearBatch();
|
||||
// }
|
||||
preparedStatement.executeBatch();
|
||||
connection.commit();
|
||||
|
||||
} catch (Exception ee) {
|
||||
log.error("数据插入click house 报错:", ee);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
log.error("ClickhouseSink插入报错====", ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static ClickHouseConnection getConn() {
|
||||
|
||||
int socketTimeout = 600000;
|
||||
ClickHouseProperties properties = new ClickHouseProperties();
|
||||
properties.setUser(username);
|
||||
properties.setPassword(password);
|
||||
properties.setDatabase(database);
|
||||
properties.setSocketTimeout(socketTimeout);
|
||||
ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties);
|
||||
ClickHouseConnection conn = null;
|
||||
try {
|
||||
conn = clickHouseDataSource.getConnection();
|
||||
return conn;
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static Map getField() {
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
public String preparedSql(String fieldStr, String tableName) {
|
||||
List<String> fields = StrUtil.split(fieldStr, ",");
|
||||
return getInsertSql(fields, tableName);
|
||||
}
|
||||
|
||||
public String getInsertSql(List<String> fileds, String tableName) {
|
||||
String sql = "";
|
||||
String sqlStr1 = "INSERT INTO `" + database + "`." + tableName + " (";
|
||||
String sqlStr2 = ") VALUES (";
|
||||
String sqlStr3 = ")";
|
||||
String sqlKey = "";
|
||||
String sqlValue = "";
|
||||
for (String key : fileds) {
|
||||
sqlKey += key + ",";
|
||||
sqlValue += "?,";
|
||||
}
|
||||
sqlKey = sqlKey.substring(0, sqlKey.length() - 1);
|
||||
sqlValue = sqlValue.substring(0, sqlValue.length() - 1);
|
||||
sql = StrUtil.concat(true, sqlStr1, sqlKey, sqlStr2, sqlValue, sqlStr3);
|
||||
|
||||
// String placeholders = Arrays.stream(fieldNames)
|
||||
// .map(f -> "?")
|
||||
// .collect(Collectors.joining(", "));
|
||||
// return "INSERT INTO " + quoteIdentifier(tableName) +
|
||||
// "(" + columns + ")" + " VALUES (" + placeholders + ")";
|
||||
|
||||
|
||||
log.info(sql);
|
||||
return sql;
|
||||
}
|
||||
}
|
||||
@@ -6,21 +6,14 @@ import cn.hutool.log.LogFactory;
|
||||
import org.apache.flink.api.common.functions.RichFlatMapFunction;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import ru.yandex.clickhouse.ClickHouseConnection;
|
||||
import ru.yandex.clickhouse.ClickHouseDataSource;
|
||||
import ru.yandex.clickhouse.settings.ClickHouseProperties;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static ru.yandex.clickhouse.ClickHouseUtil.quoteIdentifier;
|
||||
|
||||
public class CKSinkFlatMap extends RichFlatMapFunction<Map<String, Object>, String> {
|
||||
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
/*
|
||||
package com.zdjizhi.utils.ck;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
*/
|
||||
/**
|
||||
* clickhouse方言
|
||||
*//*
|
||||
|
||||
public class ClickHouseJDBCDialect implements JDBCDialect {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public boolean canHandle(String url) {
|
||||
return url.startsWith("jdbc:clickhouse:");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> defaultDriverName() {
|
||||
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String quoteIdentifier(String identifier) {
|
||||
return "`" + identifier + "`";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
|
||||
return Optional.of(getInsertIntoStatement(tableName, fieldNames));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
*/
|
||||
@@ -27,15 +27,7 @@ public class ClickhouseSink extends RichSinkFunction<Map<String,Object>> {
|
||||
public String sink;
|
||||
|
||||
static {
|
||||
try {
|
||||
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
|
||||
connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
|
||||
// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
|
||||
// connection = dataSource.getConnection();
|
||||
log.info("get clickhouse connection success");
|
||||
} catch (ClassNotFoundException | SQLException e) {
|
||||
log.error("clickhouse connection error ,{}", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public ClickhouseSink(String sink) {
|
||||
@@ -57,7 +49,15 @@ public class ClickhouseSink extends RichSinkFunction<Map<String,Object>> {
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
|
||||
try {
|
||||
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
|
||||
connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
|
||||
// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
|
||||
// connection = dataSource.getConnection();
|
||||
log.info("get clickhouse connection success");
|
||||
} catch (ClassNotFoundException | SQLException e) {
|
||||
log.error("clickhouse connection error ,{}", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
56
src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java
Normal file
56
src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java
Normal file
@@ -0,0 +1,56 @@
|
||||
/*
|
||||
package com.zdjizhi.utils.ck;
|
||||
|
||||
import org.apache.flink.api.java.utils.ParameterTool;
|
||||
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
|
||||
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
*/
|
||||
/**
|
||||
* @description:
|
||||
* @author: zhq
|
||||
* @create: 2022-06-29
|
||||
**//*
|
||||
|
||||
public class ClickhouseUtil {
|
||||
|
||||
|
||||
public static ParameterTool getGlobalPro() {
|
||||
Map<String, String> sinkPro = new HashMap<>();
|
||||
//sink Properties
|
||||
sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "sc.chproxy.bigdata.services.org:10000");
|
||||
|
||||
// ClickHouse 本地写账号
|
||||
sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_USER, "default");
|
||||
sinkPro.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, "galaxy2019");
|
||||
// sink common
|
||||
sinkPro.put(ClickHouseSinkConst.TIMEOUT_SEC, "10");
|
||||
sinkPro.put(ClickHouseSinkConst.NUM_WRITERS, "10");
|
||||
sinkPro.put(ClickHouseSinkConst.NUM_RETRIES, "3");
|
||||
sinkPro.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "1000000");
|
||||
sinkPro.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");
|
||||
sinkPro.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");//本地运行会在项目内生成名字为"d:"的文件夹,以存放运行失败明细记录
|
||||
|
||||
// env - sinkPro
|
||||
ParameterTool parameters = ParameterTool.fromMap(sinkPro);
|
||||
|
||||
|
||||
return parameters;
|
||||
|
||||
}
|
||||
|
||||
public static Properties getCKPro() {
|
||||
// ClickHouseSink - sinkPro
|
||||
Properties props = new Properties();
|
||||
props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "database_1564.ch_zjk_test_local");
|
||||
props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");
|
||||
return props;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
*/
|
||||
39
src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java
Normal file
39
src/main/java/com/zdjizhi/utils/ck/JDBCDialect.java
Normal file
@@ -0,0 +1,39 @@
|
||||
/*
|
||||
package com.zdjizhi.utils.ck;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static ru.yandex.clickhouse.ClickHouseUtil.quoteIdentifier;
|
||||
|
||||
*/
|
||||
/**
|
||||
* Handle the SQL dialect of jdbc driver.
|
||||
*//*
|
||||
|
||||
public interface JDBCDialect extends Serializable {
|
||||
default Optional<String> getUpsertStatement(
|
||||
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
|
||||
return Optional.empty();
|
||||
}
|
||||
default String getInsertIntoStatement(String tableName, String[] fieldNames) {
|
||||
String columns = Arrays.stream(fieldNames)
|
||||
.map(this::quoteIdentifier)
|
||||
.collect(Collectors.joining(", "));
|
||||
String placeholders = Arrays.stream(fieldNames)
|
||||
.map(f -> "?")
|
||||
.collect(Collectors.joining(", "));
|
||||
return "INSERT INTO " + quoteIdentifier(tableName) +
|
||||
"(" + columns + ")" + " VALUES (" + placeholders + ")";
|
||||
}
|
||||
|
||||
default String getDeleteStatement(String tableName, String[] conditionFields) {
|
||||
String conditionClause = Arrays.stream(conditionFields)
|
||||
.map(f -> quoteIdentifier(f) + "=?")
|
||||
.collect(Collectors.joining(" AND "));
|
||||
return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause;
|
||||
}
|
||||
}
|
||||
*/
|
||||
14
src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java
Normal file
14
src/main/java/com/zdjizhi/utils/ck/JDBCDialects.java
Normal file
@@ -0,0 +1,14 @@
|
||||
/*
|
||||
package com.zdjizhi.utils.ck;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public final class JDBCDialects {
|
||||
|
||||
private static final List<JDBCDialect> DIALECTS = Arrays.asList(
|
||||
// new DerbyDialect(),
|
||||
// new MySQLDialect(),
|
||||
// new PostgresDialect()
|
||||
);
|
||||
}*/
|
||||
@@ -1,17 +1,8 @@
|
||||
package com.zdjizhi.utils.kafka;
|
||||
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.pojo.DbLogEntity;
|
||||
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
||||
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
|
||||
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
|
||||
import org.apache.flink.api.common.typeutils.base.StringSerializer;
|
||||
import org.apache.flink.connector.kafka.source.KafkaSource;
|
||||
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
|
||||
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
|
||||
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
@@ -3,8 +3,6 @@ package com.zdjizhi.utils.kafka;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.enums.LogMetadata;
|
||||
import com.zdjizhi.pojo.DbLogEntity;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
|
||||
|
||||
@@ -1,14 +1,8 @@
|
||||
package com.zdjizhi.utils.system;
|
||||
|
||||
import com.alibaba.nacos.api.NacosFactory;
|
||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||
import com.alibaba.nacos.api.config.ConfigService;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.Locale;
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user