优化代码:统一时间单位为秒,删除部分冗余代码。

This commit is contained in:
zhanghongqing
2022-07-22 15:35:07 +08:00
parent fa3f628658
commit 9f22443c7c
21 changed files with 217 additions and 783 deletions

25
pom.xml
View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>knowledge-log</artifactId>
<version>202207</version>
<version>20220722</version>
<name>log-completion-schema</name>
<url>http://www.example.com</url>
@@ -174,23 +174,6 @@
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
@@ -204,12 +187,6 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>

View File

@@ -1,6 +1,6 @@
#--------------------------------\u5730\u5740\u914D\u7F6E------------------------------#
#\u7BA1\u7406kafka\u5730\u5740,\u591A\u53F0\u9017\u53F7\u8FDE\u63A5ip1:9094,ip2:9094
source.kafka.servers=192.168.45.102:9094
source.kafka.servers=192.168.44.85:9094,192.168.44.86:9094,192.168.44.87:9094
#\u7BA1\u7406\u8F93\u51FAkafka\u5730\u5740
sink.kafka.servers=
@@ -8,14 +8,14 @@ sink.kafka.servers=
tools.library=
#--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------#
#\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B
group.id=KNOWLEDGE-GROUP
group.id=KNOWLEDGE-GROUP3
#--------------------------------topology\u914D\u7F6E------------------------------#
#consumer \u5E76\u884C\u5EA6
source.parallelism=12
source.parallelism=1
#\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6
transform.parallelism=12
transform.parallelism=1
#kafka producer \u5E76\u884C\u5EA6
sink.parallelism=12
sink.parallelism=1
#--------------------------------\u4E1A\u52A1\u914D\u7F6E------------------------------#
#1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7
@@ -24,7 +24,7 @@ log.type=1
#\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy
producer.kafka.compression.type=none
#kafka\u6570\u636E\u6E90topic
source.kafka.topic.connection=CONNECTION-RECORD-LOG
source.kafka.topic.connection=test12
source.kafka.topic.sketch=CONNECTION-SKETCH-RECORD-LOG
source.kafka.topic.dns=DNS-RECORD-LOG
#\u5199\u5165clickhouse\u672C\u5730\u8868
@@ -40,10 +40,10 @@ sink.arangodb.table.r.mx.domain2domain=R_MX_DOMAIN2DOMAIN
sink.arangodb.table.r.resolve.domain2ip=R_RESOLVE_DOMAIN2IP
sink.arangodb.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN
#\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD7\uFF0C\u6216\u8005\u9009\u62E9\u5B89\u88C5gohangout\u5165\u5E93 0\uFF1A\u5426\uFF0C1\uFF1A\u662F
#\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD70\uFF1A\u5426\uFF0C1\uFF1A\u662F
sink.ck.raw.log.insert.open=1
#clickhouse\u914D\u7F6E\uFF0C\u591A\u4E2A\u9017\u53F7\u8FDE\u63A5 ip1:8123,ip2:8123
ck.hosts=192.168.45.102:8123,192.168.45.102:8123
ck.hosts=192.168.44.85:8123,192.168.44.86:8123,192.168.44.87:8123
ck.database=tsg_galaxy_v3
ck.username=default
ck.pin=galaxy2019
@@ -53,7 +53,7 @@ ck.socket.timeout=300000
#clickhouse\u5165\u5E93\u6279\u91CF\u5355\u4F4D\u6761
ck.batch=10
#clickhouse\u5165\u5E93\u524D\u79EF\u7D2F\u6279\u91CF\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2
sink.ck.batch.delay.time=1000
sink.ck.batch.delay.time=2000
#flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4
flink.watermark.max.delay.time=50
@@ -68,8 +68,8 @@ arangodb.port=8529
arangodb.user=root
arangodb.password=galaxy_2019
arangodb.db.name=knowledge
arangodb.batch=100000
arangodb.batch=10000
arangodb.ttl=3600
arangodb.thread.pool.number=10
#\u6279\u91CF\u7D2F\u8BA1\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2ms
sink.arangodb.batch.delay.time=5
sink.arangodb.batch.delay.time=1000

View File

@@ -9,11 +9,11 @@ import static com.zdjizhi.common.FlowWriteConfig.*;
**/
public enum DnsType {
//对应dns类型,编码,入库表
A("a", "0x0001", R_RESOLVE_DOMAIN2IP),
AAAA("aaaa", "0x001c", R_RESOLVE_DOMAIN2IP),
CNAME("cname", "0x0005", R_CNAME_DOMAIN2DOMAIN),
MX("mx", "0x000f", R_MX_DOMAIN2DOMAIN),
NS("ns", "0x0002", R_NX_DOMAIN2DOMAIN);
A("a", "1", R_RESOLVE_DOMAIN2IP),
AAAA("aaaa", "28", R_RESOLVE_DOMAIN2IP),
CNAME("cname", "5", R_CNAME_DOMAIN2DOMAIN),
MX("mx", "15", R_MX_DOMAIN2DOMAIN),
NS("ns", "2", R_NX_DOMAIN2DOMAIN);
private String type;
private String code;

View File

@@ -54,7 +54,6 @@ public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {
this.stateName = stateName;
}
private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
clear(window, ctx);
return TriggerResult.FIRE_AND_PURGE;
@@ -90,7 +89,7 @@ public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
logger.info("fire with process tiem: " + time);
logger.debug("fire with process tiem: " + time);
return fireAndPurge(window, ctx);
}
}
@@ -106,7 +105,7 @@ public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
logger.info("fire with event tiem: " + time);
logger.debug("fire with event tiem: " + time);
return fireAndPurge(window, ctx);
}
}

View File

@@ -0,0 +1,19 @@
package com.zdjizhi.etl;
import com.zdjizhi.utils.json.TypeUtils;
import java.util.Map;
public class LogFormat {
public static Map<String, Object> connTime(Map<String, Object> value) {
value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time")));
return value;
}
public static Map<String, Object> sketchTime(Map<String, Object> value) {
value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time")));
return value;
}
}

View File

@@ -1,6 +1,7 @@
package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -47,8 +48,8 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
long sessions = 0;
long packets = 0;
long bytes = 0;
long startTime = System.currentTimeMillis() / 1000;
long endTime = System.currentTimeMillis() / 1000;
long startTime = DateUtil.currentSeconds();
long endTime = DateUtil.currentSeconds();
try {
for (Map<String, Object> newSketchLog : elements) {
sessions++;

View File

@@ -0,0 +1,16 @@
package com.zdjizhi.etl.connection;
import com.zdjizhi.utils.json.TypeUtils;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
public class ConnTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
@Override
public Map<String, Object> map(Map<String, Object> value) throws Exception {
value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time")));
value.put("log_gen_time", TypeUtils.coverMSToS(value.get("log_gen_time")));
return value;
}
}

View File

@@ -24,10 +24,10 @@ public class Ip2IpGraphProcessFunction extends ProcessWindowFunction<Map<String,
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
try {
long lastFoundTime = DateUtil.currentSeconds();
long lastFoundTime = DateUtil.currentSeconds();;
for (Map<String, Object> log : elements) {
long connStartTimetime = Convert.toLong(log.get("start_time"));
lastFoundTime = connStartTimetime > lastFoundTime ? connStartTimetime : lastFoundTime;
long connStartTime = Convert.toLong(log.get("start_time"));
lastFoundTime = connStartTime > lastFoundTime ? connStartTime : lastFoundTime;
}
Map<String, Object> newLog = new HashMap<>();
newLog.put("src_ip", keys.f0);

View File

@@ -1,6 +1,7 @@
package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -61,8 +62,8 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
long sessions = 0;
long packets = 0;
long bytes = 0;
long startTime = System.currentTimeMillis() / 1000;
long endTime = System.currentTimeMillis() / 1000;
long startTime = DateUtil.currentSeconds();
long endTime = DateUtil.currentSeconds();
try {
for (Map<String, Object> newSketchLog : elements) {
sessions += Convert.toLong(newSketchLog.get("sketch_sessions"));

View File

@@ -1,15 +1,17 @@
package com.zdjizhi.etl.dns;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.google.common.base.Joiner;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.utils.json.TypeUtils;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* @author zhq
@@ -22,55 +24,57 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
@Override
public Map<String, Object> map(Map<String, Object> rawLog) throws Exception {
try {
Object response = rawLog.get("response");
JSONArray responseArray = JSONUtil.parseArray(response);
String dnsA = null;
int dnsANum = 0;
String dnsAAAA = null;
int dnsAAAANum = 0;
String dnsCNAME = null;
int dnsCNAMENum = 0;
String dnsNs = null;
int dnsNsNum = 0;
String dnsMx = null;
int dnsMxNum = 0;
for (Object res : responseArray) {
Map<String, Object> resMap = (Map<String, Object>) res;
String type = StrUtil.toString(resMap.get("res_type"));
String body = StrUtil.toString(resMap.get("res_body"));
if (DnsType.A.getCode().equals(type)) {
dnsA = Joiner.on(",").skipNulls().join(dnsA, body);
dnsANum++;
} else if (DnsType.AAAA.getCode().equals(type)) {
dnsAAAA = Joiner.on(",").skipNulls().join(dnsAAAA, body);
dnsAAAANum++;
} else if (DnsType.CNAME.getCode().equals(type)) {
dnsCNAME = Joiner.on(",").skipNulls().join(dnsCNAME, body);
dnsCNAMENum++;
} else if (DnsType.NS.getCode().equals(type)) {
dnsNs = Joiner.on(",").skipNulls().join(dnsNs, body);
dnsNsNum++;
} else if (DnsType.MX.getCode().equals(type)) {
dnsMx = Joiner.on(",").skipNulls().join(dnsMx, body);
dnsMxNum++;
rawLog.put("capture_time", TypeUtils.coverMSToS(rawLog.get("capture_time")));
if (Objects.nonNull(rawLog.get("response"))) {
List<Map<String, Object>> response = (List<Map<String, Object>>) rawLog.get("response");
String dnsA = "";
int dnsANum = 0;
String dnsAAAA = "";
int dnsAAAANum = 0;
String dnsCNAME = "";
int dnsCNAMENum = 0;
String dnsNs = "";
int dnsNsNum = 0;
String dnsMx = "";
int dnsMxNum = 0;
for (Map<String, Object> resMap : response) {
String type = StrUtil.toString(resMap.get("res_type"));
String body = StrUtil.toString(resMap.get("res_body"));
if (DnsType.A.getCode().equals(type)) {
dnsA = Joiner.on(",").skipNulls().join(dnsA, body);
dnsANum++;
} else if (DnsType.AAAA.getCode().equals(type)) {
dnsAAAA = Joiner.on(",").skipNulls().join(dnsAAAA, body);
dnsAAAANum++;
} else if (DnsType.CNAME.getCode().equals(type)) {
dnsCNAME = Joiner.on(",").skipNulls().join(dnsCNAME, body);
dnsCNAMENum++;
} else if (DnsType.NS.getCode().equals(type)) {
dnsNs = Joiner.on(",").skipNulls().join(dnsNs, body);
dnsNsNum++;
} else if (DnsType.MX.getCode().equals(type)) {
dnsMx = Joiner.on(",").skipNulls().join(dnsMx, body);
dnsMxNum++;
}
}
rawLog.put("response", JSONUtil.toJsonStr(response));
//获取类型相同类型合并用拼接并且计数加1
rawLog.put("dns_a", dnsA);
rawLog.put("dns_a_num", dnsANum);
rawLog.put("dns_aaaa", dnsAAAA);
rawLog.put("dns_aaaa_num", dnsAAAANum);
rawLog.put("dns_cname", dnsCNAME);
rawLog.put("dns_cname_num", dnsCNAMENum);
rawLog.put("dns_ns", dnsNs);
rawLog.put("dns_ns_num", dnsNsNum);
rawLog.put("dns_mx", dnsMx);
rawLog.put("dns_mx_num", dnsMxNum);
}
//获取类型相同类型合并用拼接并且计数加1
rawLog.put("dns_a", dnsA);
rawLog.put("dns_a_num", dnsANum);
rawLog.put("dns_aaaa", dnsAAAA);
rawLog.put("dns_aaaa_num", dnsAAAANum);
rawLog.put("dns_cname", dnsCNAME);
rawLog.put("dns_cname_num", dnsCNAMENum);
rawLog.put("dns_ns", dnsNs);
rawLog.put("dns_ns_num", dnsNsNum);
rawLog.put("dns_mx", dnsMx);
rawLog.put("dns_mx_num", dnsMxNum);
} catch (Exception e) {
logger.error("dns 原始日志拆分 response 失败 {}", e.getMessage());
}

View File

@@ -0,0 +1,15 @@
package com.zdjizhi.etl.dns;
import com.zdjizhi.utils.json.TypeUtils;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
public class SketchTimeMapFunction implements MapFunction<Map<String, Object>, Map<String, Object>> {
@Override
public Map<String, Object> map(Map<String, Object> value) throws Exception {
value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time")));
return value;
}
}

View File

@@ -40,49 +40,19 @@ public class LogFlowWriteTopology {
if (FlowWriteConfig.LOG_TYPE == 1) {
//connection
DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION))
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.setParallelism(SOURCE_PARALLELISM)
.filter(Objects::nonNull)
.map(new ConnTimeMapFunction())
.setParallelism(SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_CONNECTION);
//sketch
DataStream<Map<String, Object>> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH))
.filter(Objects::nonNull)
.map(new SketchTimeMapFunction())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_SKETCH);
//transform
DataStream<Map<String, Object>> connTransformStream = connSource
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new ConnProcessFunction())
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
.filter(x -> Objects.nonNull(x))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new SketchProcessFunction())
.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//入Arangodb
DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//写入CKsink,批量处理
if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
@@ -98,14 +68,47 @@ public class LogFlowWriteTopology {
.addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
}
sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
//transform
DataStream<Map<String, Object>> connTransformStream = connSource
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new ConnProcessFunction())
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
.filter(x -> Objects.nonNull(x))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
connTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new SketchProcessFunction())
.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//入Arangodb
DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//写入arangodb
ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(R_VISIT_IP2IP, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
@@ -121,19 +124,6 @@ public class LogFlowWriteTopology {
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
.setParallelism(TRANSFORM_PARALLELISM)
.flatMap(new DnsSplitFlatMapFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//dns 原始日志 ck入库
if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
dnsSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
@@ -144,8 +134,22 @@ public class LogFlowWriteTopology {
.name("CKSink");
}
DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
.setParallelism(TRANSFORM_PARALLELISM)
.flatMap(new DnsSplitFlatMapFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//dns 拆分后relation日志 ck入库
dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
dnsTransform.filter(Objects::nonNull).windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS))
@@ -153,13 +157,13 @@ public class LogFlowWriteTopology {
.name("CKSink");
//arango 入库,按record_type分组入不同的表
DataStream<Map<String, Object>> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector())
DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull).keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new DnsGraphProcessFunction())
.setParallelism(SINK_PARALLELISM);
for (DnsType dnsEnum : DnsType.values()) {
dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
.setParallelism(SINK_PARALLELISM)
.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(dnsEnum.getSink(), ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))

View File

@@ -1,5 +1,6 @@
package com.zdjizhi.utils.ck;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
@@ -48,7 +49,6 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
ClickHouseProperties properties = new ClickHouseProperties();
@@ -78,20 +78,22 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
public void executeInsert(List<Map<String, Object>> data, String tableName) {
try {
List<String> keys = new LinkedList<>(data.get(0).keySet());
connection.setAutoCommit(false);
preparedStatement = connection.prepareStatement(preparedSql(keys, tableName));
int count = 0;
for (Map<String, Object> map : data) {
List<String> keys = new LinkedList<>(map.keySet());
preparedStatement = connection.prepareStatement(preparedSql(keys, tableName));
List<Object> values = new LinkedList<>(map.values());
for (int i = 1; i <= values.size(); i++) {
Object val = values.get(i - 1);
if (val instanceof Long) {
preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val)));
preparedStatement.setLong((i), Convert.toLong(val));
} else if (val instanceof Integer) {
preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val)));
preparedStatement.setLong((i), Convert.toLong(val));
} else if (val instanceof Boolean) {
preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val)));
preparedStatement.setInt((i), Boolean.valueOf(StrUtil.toString(val)) ? 1 : 0);
} else {
preparedStatement.setString((i), StrUtil.toString(val));
}

View File

@@ -1,213 +0,0 @@
package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.zookeeper.DistributedLock;
import com.zdjizhi.utils.zookeeper.ZookeeperUtils;
/**
* 雪花算法
*
* @author qidaijie
*/
public class SnowflakeId {
private static final Log logger = LogFactory.get();
/**
* 共64位 第一位为符号位 默认0
* 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :6位(0-63),
* workerId(关联进程):7(0-127) ,序列号11位(2047/ms)
*
* 序列号 /ms = (-1L ^ (-1L << 11))
* 最大使用年 = (1L << 39) / (1000L * 60 * 60 * 24 * 365)
*/
/**
* 开始时间截 (2020-11-14 00:00:00) max 17years
*/
private final long twepoch = 1605283200000L;
/**
* 机器id所占的位数
*/
private final long workerIdBits = 8L;
/**
* 数据标识id所占的位数
*/
private final long dataCenterIdBits = 5L;
/**
* 支持的最大机器id结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
* M << n = M * 2^n
*/
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
/**
* 支持的最大数据标识id结果是31
*/
private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
/**
* 序列在id中占的位数
*/
private final long sequenceBits = 11L;
/**
* 机器ID向左移12位
*/
private final long workerIdShift = sequenceBits;
/**
* 数据标识id向左移17位(14+6)
*/
private final long dataCenterIdShift = sequenceBits + workerIdBits;
/**
* 时间截向左移22位(4+6+14)
*/
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
/**
* 生成序列的掩码这里为2047
*/
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/**
* 工作机器ID(0~255)
*/
private long workerId;
/**
* 数据中心ID(0~31)
*/
private long dataCenterId;
/**
* 毫秒内序列(0~2047)
*/
private long sequence = 0L;
/**
* 上次生成ID的时间截
*/
private long lastTimestamp = -1L;
/**
* 设置允许时间回拨的最大限制10s
*/
private static final long rollBackTime = 10000L;
private static SnowflakeId idWorker;
private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
static {
idWorker = new SnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM);
}
//==============================Constructors=====================================
/**
* 构造函数
*/
private SnowflakeId(String zookeeperIp, long dataCenterIdNum) {
DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
try {
lock.lock();
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp);
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
}
this.workerId = tmpWorkerId;
this.dataCenterId = dataCenterIdNum;
} catch (RuntimeException e) {
logger.error("This is not usual error!!!===>>>" + e + "<<<===");
}finally {
lock.unlock();
}
}
// ==============================Methods==========================================
/**
* 获得下一个ID (该方法是线程安全的)
*
* @return SnowflakeId
*/
private synchronized long nextId() {
long timestamp = timeGen();
//设置一个允许回拨限制时间系统时间回拨范围在rollBackTime内可以等待校准
if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) {
timestamp = tilNextMillis(lastTimestamp);
}
//如果当前时间小于上一次ID生成的时间戳说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}
//上次生成ID的时间截
lastTimestamp = timestamp;
//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift)
| (dataCenterId << dataCenterIdShift)
| (workerId << workerIdShift)
| sequence;
}
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
*
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 返回以毫秒为单位的当前时间
*
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}
/**
* 静态工具类
*
* @return
*/
public static Long generateId() {
return idWorker.nextId();
}
}

View File

@@ -65,9 +65,9 @@ public class TransFormMap {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
}
break;
case "snowflake_id":
JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
break;
// case "snowflake_id":
// JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
// break;
case "geo_ip_detail":
if (logValue != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));

View File

@@ -65,11 +65,11 @@ public class TransFormTypeMap {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
}
break;
case "snowflake_id":
JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
//版本规划暂不实现TSG-22.01
// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId()));
break;
// case "snowflake_id":
// JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
// //版本规划暂不实现TSG-22.01
//// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId()));
// break;
case "geo_ip_detail":
if (logValue != null && appendToKeyValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));

View File

@@ -1,77 +0,0 @@
package com.zdjizhi.utils.http;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 获取网关schema的工具类
*
* @author qidaijie
*/
public class HttpClientUtil {
private static final Log logger = LogFactory.get();
/**
* 请求网关获取schema
*
* @param http 网关url
* @return schema
*/
public static String requestByGetMethod(String http) {
CloseableHttpClient httpClient = HttpClients.createDefault();
StringBuilder entityStringBuilder;
HttpGet get = new HttpGet(http);
BufferedReader bufferedReader = null;
CloseableHttpResponse httpResponse = null;
try {
httpResponse = httpClient.execute(get);
HttpEntity entity = httpResponse.getEntity();
entityStringBuilder = new StringBuilder();
if (null != entity) {
bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
int intC;
while ((intC = bufferedReader.read()) != -1) {
char c = (char) intC;
if (c == '\n') {
break;
}
entityStringBuilder.append(c);
}
return entityStringBuilder.toString();
}
} catch (IOException e) {
logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
} finally {
if (httpClient != null) {
try {
httpClient.close();
} catch (IOException e) {
logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
}
}
if (httpResponse != null) {
try {
httpResponse.close();
} catch (IOException e) {
logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
}
}
if (bufferedReader != null) {
IOUtils.closeQuietly(bufferedReader);
}
}
return "";
}
}

View File

@@ -1,11 +1,15 @@
package com.zdjizhi.utils.json;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.exception.FlowWriteException;
import java.util.concurrent.TimeUnit;
/**
* @author qidaijie
* @Package PACKAGE_NAME
@@ -168,4 +172,17 @@ public class TypeUtils {
throw new FlowWriteException("can not cast to long, value : " + value);
}
public static long coverMSToS(long ms) {
if (ms > 9_999_999_999L) {
return Convert.convertTime(ms, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
}
return ms;
}
public static Object coverMSToS(Object ms) {
if (StrUtil.toString(ms).length() == 13) {
return StrUtil.sub(StrUtil.toString(ms), 0, 10);
}
return ms;
}
}

View File

@@ -46,5 +46,4 @@ public class KafkaProducer {
return kafkaProducer;
}
}

View File

@@ -1,190 +0,0 @@
package com.zdjizhi.utils.zookeeper;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author qidaijie
*/
public class DistributedLock implements Lock, Watcher {
private static final Log logger = LogFactory.get();
private ZooKeeper zk = null;
/**
* 根节点
*/
private final String ROOT_LOCK = "/locks";
/**
* 竞争的资源
*/
private String lockName;
/**
* 等待的前一个锁
*/
private String waitLock;
/**
* 当前锁
*/
private String currentLock;
/**
* 计数器
*/
private CountDownLatch countDownLatch;
private int sessionTimeout = 2000;
private List<Exception> exceptionList = new ArrayList<Exception>();
/**
* 配置分布式锁
*
* @param config 连接的url
* @param lockName 竞争资源
*/
public DistributedLock(String config, String lockName) {
this.lockName = lockName;
try {
// 连接zookeeper
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(ROOT_LOCK, false);
if (stat == null) {
// 如果根节点不存在,则创建根节点
zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (IOException | InterruptedException | KeeperException e) {
logger.error("Node already exists!");
}
}
// 节点监视器
@Override
public void process(WatchedEvent event) {
if (this.countDownLatch != null) {
this.countDownLatch.countDown();
}
}
@Override
public void lock() {
if (exceptionList.size() > 0) {
throw new LockException(exceptionList.get(0));
}
try {
if (this.tryLock()) {
logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁");
} else {
// 等待锁
waitForLock(waitLock, sessionTimeout);
}
} catch (InterruptedException | KeeperException e) {
logger.error("获取锁异常" + e);
}
}
@Override
public boolean tryLock() {
try {
String splitStr = "_lock_";
if (lockName.contains(splitStr)) {
throw new LockException("锁名有误");
}
// 创建临时有序节点
currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 取所有子节点
List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
// 取出所有lockName的锁
List<String> lockObjects = new ArrayList<String>();
for (String node : subNodes) {
String tmpNode = node.split(splitStr)[0];
if (tmpNode.equals(lockName)) {
lockObjects.add(node);
}
}
Collections.sort(lockObjects);
// 若当前节点为最小节点,则获取锁成功
if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
return true;
}
// 若不是最小节点,则找到自己的前一个节点
String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
} catch (InterruptedException | KeeperException e) {
logger.error("获取锁过程异常" + e);
}
return false;
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) {
try {
if (this.tryLock()) {
return true;
}
return waitForLock(waitLock, timeout);
} catch (KeeperException | InterruptedException | RuntimeException e) {
logger.error("判断是否锁定异常" + e);
}
return false;
}
// 等待锁
private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
if (stat != null) {
this.countDownLatch = new CountDownLatch(1);
// 计数等待若等到前一个节点消失则precess中进行countDown停止等待获取锁
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
}
return true;
}
@Override
public void unlock() {
try {
zk.delete(currentLock, -1);
currentLock = null;
zk.close();
} catch (InterruptedException | KeeperException e) {
logger.error("关闭锁异常" + e);
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void lockInterruptibly() throws InterruptedException {
this.lock();
}
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e) {
super(e);
}
public LockException(Exception e) {
super(e);
}
}
}

View File

@@ -1,140 +0,0 @@
package com.zdjizhi.utils.zookeeper;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author qidaijie
* @Package cn.ac.iie.utils.zookeeper
* @Description:
* @date 2020/11/1411:28
*/
public class ZookeeperUtils implements Watcher {
private static final Log logger = LogFactory.get();
private static final int ID_MAX = 255;
private ZooKeeper zookeeper;
private static final int SESSION_TIME_OUT = 20000;
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
/**
* 修改节点信息
*
* @param path 节点路径
*/
public int modifyNode(String path, String zookeeperIp) {
createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp);
int workerId = 0;
try {
connectZookeeper(zookeeperIp);
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
if (workerId > ID_MAX) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
} else {
String result = String.valueOf(workerId + 1);
if (stat != null) {
zookeeper.setData(path, result.getBytes(), stat.getVersion());
} else {
logger.error("Node does not exist!,Can't modify");
}
}
} catch (KeeperException | InterruptedException e) {
logger.error("modify error Can't modify," + e);
} finally {
closeConn();
}
logger.warn("workerID is" + workerId);
return workerId;
}
/**
* 连接zookeeper
*
* @param host 地址
*/
public void connectZookeeper(String host) {
try {
zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
countDownLatch.await();
} catch (IOException | InterruptedException e) {
logger.error("Connection to the Zookeeper Exception! message:" + e);
}
}
/**
* 关闭连接
*/
public void closeConn() {
try {
if (zookeeper != null) {
zookeeper.close();
}
} catch (InterruptedException e) {
logger.error("Close the Zookeeper connection Exception! message:" + e);
}
}
/**
* 获取节点内容
*
* @param path 节点路径
* @return 内容/异常null
*/
public String getNodeDate(String path) {
String result = null;
Stat stat = new Stat();
try {
byte[] resByte = zookeeper.getData(path, true, stat);
result = StrUtil.str(resByte, "UTF-8");
} catch (KeeperException | InterruptedException e) {
logger.error("Get node information exception" + e);
}
return result;
}
/**
* @param path 节点创建的路径
* @param date 节点所存储的数据的byte[]
* @param acls 控制权限策略
*/
public void createNode(String path, byte[] date, List<ACL> acls, String zookeeperIp) {
try {
connectZookeeper(zookeeperIp);
Stat exists = zookeeper.exists(path, true);
if (exists == null) {
Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true);
if (existsSnowflakeld == null) {
zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT);
}
zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
} else {
logger.warn("Node already exists ! Don't need to create");
}
} catch (KeeperException | InterruptedException e) {
logger.error(e);
} finally {
closeConn();
}
}
}