diff --git a/pom.xml b/pom.xml index 7cdc355..221db19 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi knowledge-log - 202207 + 20220722 log-completion-schema http://www.example.com @@ -174,23 +174,6 @@ ${scope.type} - - - org.apache.zookeeper - zookeeper - 3.4.10 - - - slf4j-log4j12 - org.slf4j - - - log4j-over-slf4j - org.slf4j - - - - cglib cglib-nodep @@ -204,12 +187,6 @@ compile - - org.apache.httpcomponents - httpclient - 4.5.2 - - com.jayway.jsonpath json-path diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 7fef174..bffc433 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,6 +1,6 @@ #--------------------------------\u5730\u5740\u914D\u7F6E------------------------------# #\u7BA1\u7406kafka\u5730\u5740,\u591A\u53F0\u9017\u53F7\u8FDE\u63A5ip1:9094,ip2:9094 -source.kafka.servers=192.168.45.102:9094 +source.kafka.servers=192.168.44.85:9094,192.168.44.86:9094,192.168.44.87:9094 #\u7BA1\u7406\u8F93\u51FAkafka\u5730\u5740 sink.kafka.servers= @@ -8,14 +8,14 @@ sink.kafka.servers= tools.library= #--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------# #\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B -group.id=KNOWLEDGE-GROUP +group.id=KNOWLEDGE-GROUP3 #--------------------------------topology\u914D\u7F6E------------------------------# #consumer \u5E76\u884C\u5EA6 -source.parallelism=12 +source.parallelism=1 #\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6 -transform.parallelism=12 +transform.parallelism=1 #kafka producer \u5E76\u884C\u5EA6 -sink.parallelism=12 +sink.parallelism=1 #--------------------------------\u4E1A\u52A1\u914D\u7F6E------------------------------# #1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7 @@ -24,7 +24,7 @@ log.type=1 #\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy producer.kafka.compression.type=none #kafka\u6570\u636E\u6E90topic -source.kafka.topic.connection=CONNECTION-RECORD-LOG +source.kafka.topic.connection=test12 source.kafka.topic.sketch=CONNECTION-SKETCH-RECORD-LOG source.kafka.topic.dns=DNS-RECORD-LOG #\u5199\u5165clickhouse\u672C\u5730\u8868 @@ -40,10 +40,10 @@ sink.arangodb.table.r.mx.domain2domain=R_MX_DOMAIN2DOMAIN sink.arangodb.table.r.resolve.domain2ip=R_RESOLVE_DOMAIN2IP sink.arangodb.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN -#\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD7\uFF0C\u6216\u8005\u9009\u62E9\u5B89\u88C5gohangout\u5165\u5E93 0\uFF1A\u5426\uFF0C1\uFF1A\u662F +#\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD70\uFF1A\u5426\uFF0C1\uFF1A\u662F sink.ck.raw.log.insert.open=1 #clickhouse\u914D\u7F6E\uFF0C\u591A\u4E2A\u9017\u53F7\u8FDE\u63A5 ip1:8123,ip2:8123 -ck.hosts=192.168.45.102:8123,192.168.45.102:8123 +ck.hosts=192.168.44.85:8123,192.168.44.86:8123,192.168.44.87:8123 ck.database=tsg_galaxy_v3 ck.username=default ck.pin=galaxy2019 @@ -53,7 +53,7 @@ ck.socket.timeout=300000 #clickhouse\u5165\u5E93\u6279\u91CF\u5355\u4F4D\u6761 ck.batch=10 #clickhouse\u5165\u5E93\u524D\u79EF\u7D2F\u6279\u91CF\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2 -sink.ck.batch.delay.time=1000 +sink.ck.batch.delay.time=2000 #flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4 flink.watermark.max.delay.time=50 @@ -68,8 +68,8 @@ arangodb.port=8529 arangodb.user=root arangodb.password=galaxy_2019 arangodb.db.name=knowledge -arangodb.batch=100000 +arangodb.batch=10000 arangodb.ttl=3600 arangodb.thread.pool.number=10 #\u6279\u91CF\u7D2F\u8BA1\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2ms -sink.arangodb.batch.delay.time=5 \ No newline at end of file +sink.arangodb.batch.delay.time=1000 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/enums/DnsType.java b/src/main/java/com/zdjizhi/enums/DnsType.java index a5c5586..95776c8 100644 --- a/src/main/java/com/zdjizhi/enums/DnsType.java +++ b/src/main/java/com/zdjizhi/enums/DnsType.java @@ -9,11 +9,11 @@ import static com.zdjizhi.common.FlowWriteConfig.*; **/ public enum DnsType { //对应dns类型,编码,入库表 - A("a", "0x0001", R_RESOLVE_DOMAIN2IP), - AAAA("aaaa", "0x001c", R_RESOLVE_DOMAIN2IP), - CNAME("cname", "0x0005", R_CNAME_DOMAIN2DOMAIN), - MX("mx", "0x000f", R_MX_DOMAIN2DOMAIN), - NS("ns", "0x0002", R_NX_DOMAIN2DOMAIN); + A("a", "1", R_RESOLVE_DOMAIN2IP), + AAAA("aaaa", "28", R_RESOLVE_DOMAIN2IP), + CNAME("cname", "5", R_CNAME_DOMAIN2DOMAIN), + MX("mx", "15", R_MX_DOMAIN2DOMAIN), + NS("ns", "2", R_NX_DOMAIN2DOMAIN); private String type; private String code; diff --git a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java index ba348bd..66b36a9 100644 --- a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java +++ b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java @@ -54,7 +54,6 @@ public class CountTriggerWithTimeout extends Trigger { this.stateName = stateName; } - private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception { clear(window, ctx); return TriggerResult.FIRE_AND_PURGE; @@ -90,7 +89,7 @@ public class CountTriggerWithTimeout extends Trigger { if (time >= window.getEnd()) { return TriggerResult.CONTINUE; } else { - logger.info("fire with process tiem: " + time); + logger.debug("fire with process tiem: " + time); return fireAndPurge(window, ctx); } } @@ -106,7 +105,7 @@ public class CountTriggerWithTimeout extends Trigger { if (time >= window.getEnd()) { return TriggerResult.CONTINUE; } else { - logger.info("fire with event tiem: " + time); + logger.debug("fire with event tiem: " + time); return fireAndPurge(window, ctx); } } diff --git a/src/main/java/com/zdjizhi/etl/LogFormat.java b/src/main/java/com/zdjizhi/etl/LogFormat.java new file mode 100644 index 0000000..c0edaa8 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/LogFormat.java @@ -0,0 +1,19 @@ +package com.zdjizhi.etl; + +import com.zdjizhi.utils.json.TypeUtils; + +import java.util.Map; + +public class LogFormat { + + public static Map connTime(Map value) { + value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time"))); + return value; + } + + + public static Map sketchTime(Map value) { + value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time"))); + return value; + } +} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java index cec2425..6fa11d6 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java @@ -1,6 +1,7 @@ package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; +import cn.hutool.core.date.DateUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import org.apache.flink.api.java.tuple.Tuple2; @@ -47,8 +48,8 @@ public class ConnProcessFunction extends ProcessWindowFunction newSketchLog : elements) { sessions++; diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java new file mode 100644 index 0000000..e957d65 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/connection/ConnTimeMapFunction.java @@ -0,0 +1,16 @@ +package com.zdjizhi.etl.connection; + +import com.zdjizhi.utils.json.TypeUtils; +import org.apache.flink.api.common.functions.MapFunction; + +import java.util.Map; + +public class ConnTimeMapFunction implements MapFunction, Map> { + + @Override + public Map map(Map value) throws Exception { + value.put("conn_start_time", TypeUtils.coverMSToS(value.get("conn_start_time"))); + value.put("log_gen_time", TypeUtils.coverMSToS(value.get("log_gen_time"))); + return value; + } +} diff --git a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java index e9dd0e2..1b1b0c9 100644 --- a/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java @@ -24,10 +24,10 @@ public class Ip2IpGraphProcessFunction extends ProcessWindowFunction keys, Context context, Iterable> elements, Collector> out) { try { - long lastFoundTime = DateUtil.currentSeconds(); + long lastFoundTime = DateUtil.currentSeconds();; for (Map log : elements) { - long connStartTimetime = Convert.toLong(log.get("start_time")); - lastFoundTime = connStartTimetime > lastFoundTime ? connStartTimetime : lastFoundTime; + long connStartTime = Convert.toLong(log.get("start_time")); + lastFoundTime = connStartTime > lastFoundTime ? connStartTime : lastFoundTime; } Map newLog = new HashMap<>(); newLog.put("src_ip", keys.f0); diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java index 698ed55..aac8428 100644 --- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java @@ -1,6 +1,7 @@ package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; +import cn.hutool.core.date.DateUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import org.apache.flink.api.java.tuple.Tuple2; @@ -61,8 +62,8 @@ public class SketchProcessFunction extends ProcessWindowFunction newSketchLog : elements) { sessions += Convert.toLong(newSketchLog.get("sketch_sessions")); diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java index b536152..4fd104d 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java @@ -1,15 +1,17 @@ package com.zdjizhi.etl.dns; import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.google.common.base.Joiner; import com.zdjizhi.enums.DnsType; +import com.zdjizhi.utils.json.TypeUtils; import org.apache.flink.api.common.functions.MapFunction; +import java.util.List; import java.util.Map; +import java.util.Objects; /** * @author zhq @@ -22,55 +24,57 @@ public class DnsMapFunction implements MapFunction, Map map(Map rawLog) throws Exception { try { - Object response = rawLog.get("response"); - JSONArray responseArray = JSONUtil.parseArray(response); - String dnsA = null; - int dnsANum = 0; - String dnsAAAA = null; - int dnsAAAANum = 0; - String dnsCNAME = null; - int dnsCNAMENum = 0; - String dnsNs = null; - int dnsNsNum = 0; - String dnsMx = null; - int dnsMxNum = 0; - for (Object res : responseArray) { - Map resMap = (Map) res; - String type = StrUtil.toString(resMap.get("res_type")); - String body = StrUtil.toString(resMap.get("res_body")); - if (DnsType.A.getCode().equals(type)) { - dnsA = Joiner.on(",").skipNulls().join(dnsA, body); - dnsANum++; - } else if (DnsType.AAAA.getCode().equals(type)) { - dnsAAAA = Joiner.on(",").skipNulls().join(dnsAAAA, body); - dnsAAAANum++; - } else if (DnsType.CNAME.getCode().equals(type)) { - dnsCNAME = Joiner.on(",").skipNulls().join(dnsCNAME, body); - dnsCNAMENum++; - } else if (DnsType.NS.getCode().equals(type)) { - dnsNs = Joiner.on(",").skipNulls().join(dnsNs, body); - dnsNsNum++; - } else if (DnsType.MX.getCode().equals(type)) { - dnsMx = Joiner.on(",").skipNulls().join(dnsMx, body); - dnsMxNum++; + rawLog.put("capture_time", TypeUtils.coverMSToS(rawLog.get("capture_time"))); + if (Objects.nonNull(rawLog.get("response"))) { + List> response = (List>) rawLog.get("response"); + String dnsA = ""; + int dnsANum = 0; + String dnsAAAA = ""; + int dnsAAAANum = 0; + String dnsCNAME = ""; + int dnsCNAMENum = 0; + String dnsNs = ""; + int dnsNsNum = 0; + String dnsMx = ""; + int dnsMxNum = 0; + for (Map resMap : response) { + String type = StrUtil.toString(resMap.get("res_type")); + String body = StrUtil.toString(resMap.get("res_body")); + if (DnsType.A.getCode().equals(type)) { + dnsA = Joiner.on(",").skipNulls().join(dnsA, body); + dnsANum++; + } else if (DnsType.AAAA.getCode().equals(type)) { + dnsAAAA = Joiner.on(",").skipNulls().join(dnsAAAA, body); + dnsAAAANum++; + } else if (DnsType.CNAME.getCode().equals(type)) { + dnsCNAME = Joiner.on(",").skipNulls().join(dnsCNAME, body); + dnsCNAMENum++; + } else if (DnsType.NS.getCode().equals(type)) { + dnsNs = Joiner.on(",").skipNulls().join(dnsNs, body); + dnsNsNum++; + } else if (DnsType.MX.getCode().equals(type)) { + dnsMx = Joiner.on(",").skipNulls().join(dnsMx, body); + dnsMxNum++; + } } + + rawLog.put("response", JSONUtil.toJsonStr(response)); + //获取类型,相同类型合并用,拼接,并且计数加1 + rawLog.put("dns_a", dnsA); + rawLog.put("dns_a_num", dnsANum); + + rawLog.put("dns_aaaa", dnsAAAA); + rawLog.put("dns_aaaa_num", dnsAAAANum); + + rawLog.put("dns_cname", dnsCNAME); + rawLog.put("dns_cname_num", dnsCNAMENum); + + rawLog.put("dns_ns", dnsNs); + rawLog.put("dns_ns_num", dnsNsNum); + + rawLog.put("dns_mx", dnsMx); + rawLog.put("dns_mx_num", dnsMxNum); } - - //获取类型,相同类型合并用,拼接,并且计数加1 - rawLog.put("dns_a", dnsA); - rawLog.put("dns_a_num", dnsANum); - - rawLog.put("dns_aaaa", dnsAAAA); - rawLog.put("dns_aaaa_num", dnsAAAANum); - - rawLog.put("dns_cname", dnsCNAME); - rawLog.put("dns_cname_num", dnsCNAMENum); - - rawLog.put("dns_ns", dnsNs); - rawLog.put("dns_ns_num", dnsNsNum); - - rawLog.put("dns_mx", dnsMx); - rawLog.put("dns_mx_num", dnsMxNum); } catch (Exception e) { logger.error("dns 原始日志拆分 response 失败 {}", e.getMessage()); } diff --git a/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java new file mode 100644 index 0000000..cf10a75 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/dns/SketchTimeMapFunction.java @@ -0,0 +1,15 @@ +package com.zdjizhi.etl.dns; + +import com.zdjizhi.utils.json.TypeUtils; +import org.apache.flink.api.common.functions.MapFunction; + +import java.util.Map; + +public class SketchTimeMapFunction implements MapFunction, Map> { + + @Override + public Map map(Map value) throws Exception { + value.put("sketch_start_time", TypeUtils.coverMSToS(value.get("sketch_start_time"))); + return value; + } +} diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index e7ac08e..1a075cc 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -40,49 +40,19 @@ public class LogFlowWriteTopology { if (FlowWriteConfig.LOG_TYPE == 1) { //connection DataStream> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION)) - .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) + .setParallelism(SOURCE_PARALLELISM) .filter(Objects::nonNull) + .map(new ConnTimeMapFunction()) .setParallelism(SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_CONNECTION); //sketch DataStream> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH)) .filter(Objects::nonNull) + .map(new SketchTimeMapFunction()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_SKETCH); - //transform - DataStream> connTransformStream = connSource - .assignTimestampsAndWatermarks(WatermarkStrategy - .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) - .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000)) - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) - .keyBy(new IpKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) - .process(new ConnProcessFunction()) - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) - .filter(x -> Objects.nonNull(x)) - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - - DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy - .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) - .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) - .keyBy(new IpKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) - .process(new SketchProcessFunction()) - .filter(Objects::nonNull) - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - - - //入Arangodb - DataStream> ip2ipGraph = connTransformStream.union(sketchTransformStream) - .keyBy(new IpKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) - .process(new Ip2IpGraphProcessFunction()) - .setParallelism(TRANSFORM_PARALLELISM) - .filter(Objects::nonNull) - .setParallelism(TRANSFORM_PARALLELISM); - //写入CKsink,批量处理 if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) @@ -98,14 +68,47 @@ public class LogFlowWriteTopology { .addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) .name("CKSink"); + } - sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + + //transform + DataStream> connTransformStream = connSource + .assignTimestampsAndWatermarks(WatermarkStrategy + .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) + .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000)) + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) + .keyBy(new IpKeysSelector()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) + .process(new ConnProcessFunction()) + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) + .filter(x -> Objects.nonNull(x)) + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + + connTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime)) .apply(new CKBatchWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) .name("CKSink"); + DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy + .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) + .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) + .keyBy(new IpKeysSelector()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) + .process(new SketchProcessFunction()) + .filter(Objects::nonNull) + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + + //入Arangodb + DataStream> ip2ipGraph = connTransformStream.union(sketchTransformStream) + .keyBy(new IpKeysSelector()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) + .process(new Ip2IpGraphProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .filter(Objects::nonNull) + .setParallelism(TRANSFORM_PARALLELISM); + //写入arangodb ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) .trigger(new CountTriggerWithTimeout<>(R_VISIT_IP2IP, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) @@ -121,19 +124,6 @@ public class LogFlowWriteTopology { .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS); - DataStream> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy - .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) - .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) - .setParallelism(TRANSFORM_PARALLELISM) - .flatMap(new DnsSplitFlatMapFunction()) - .setParallelism(TRANSFORM_PARALLELISM) - .keyBy(new DnsGraphKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) - .process(new DnsRelationProcessFunction()) - .setParallelism(TRANSFORM_PARALLELISM) - .filter(Objects::nonNull) - .setParallelism(TRANSFORM_PARALLELISM); - //dns 原始日志 ck入库 if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { dnsSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) @@ -144,8 +134,22 @@ public class LogFlowWriteTopology { .name("CKSink"); } + DataStream> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) + .assignTimestampsAndWatermarks(WatermarkStrategy + .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) + .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) + .setParallelism(TRANSFORM_PARALLELISM) + .flatMap(new DnsSplitFlatMapFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .keyBy(new DnsGraphKeysSelector()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .process(new DnsRelationProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) + .filter(Objects::nonNull) + .setParallelism(TRANSFORM_PARALLELISM); + //dns 拆分后relation日志 ck入库 - dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + dnsTransform.filter(Objects::nonNull).windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime)) .apply(new CKBatchWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS)) @@ -153,13 +157,13 @@ public class LogFlowWriteTopology { .name("CKSink"); //arango 入库,按record_type分组入不同的表 - DataStream> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector()) + DataStream> dnsGraph = dnsTransform.filter(Objects::nonNull).keyBy(new DnsGraphKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new DnsGraphProcessFunction()) .setParallelism(SINK_PARALLELISM); for (DnsType dnsEnum : DnsType.values()) { - dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) + dnsGraph.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) .setParallelism(SINK_PARALLELISM) .windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) .trigger(new CountTriggerWithTimeout<>(dnsEnum.getSink(), ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java index 7f9c63d..d58190d 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java @@ -1,5 +1,6 @@ package com.zdjizhi.utils.ck; +import cn.hutool.core.convert.Convert; import cn.hutool.core.io.IoUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; @@ -48,7 +49,6 @@ public class ClickhouseSink extends RichSinkFunction>> @Override public void open(Configuration parameters) throws Exception { - super.open(parameters); try { ClickHouseProperties properties = new ClickHouseProperties(); @@ -78,20 +78,22 @@ public class ClickhouseSink extends RichSinkFunction>> public void executeInsert(List> data, String tableName) { try { - List keys = new LinkedList<>(data.get(0).keySet()); connection.setAutoCommit(false); - preparedStatement = connection.prepareStatement(preparedSql(keys, tableName)); + int count = 0; for (Map map : data) { + List keys = new LinkedList<>(map.keySet()); + preparedStatement = connection.prepareStatement(preparedSql(keys, tableName)); + List values = new LinkedList<>(map.values()); for (int i = 1; i <= values.size(); i++) { Object val = values.get(i - 1); if (val instanceof Long) { - preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); + preparedStatement.setLong((i), Convert.toLong(val)); } else if (val instanceof Integer) { - preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val))); + preparedStatement.setLong((i), Convert.toLong(val)); } else if (val instanceof Boolean) { - preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val))); + preparedStatement.setInt((i), Boolean.valueOf(StrUtil.toString(val)) ? 1 : 0); } else { preparedStatement.setString((i), StrUtil.toString(val)); } diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java deleted file mode 100644 index 7cb907e..0000000 --- a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java +++ /dev/null @@ -1,213 +0,0 @@ -package com.zdjizhi.utils.general; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.zookeeper.DistributedLock; -import com.zdjizhi.utils.zookeeper.ZookeeperUtils; - -/** - * 雪花算法 - * - * @author qidaijie - */ -public class SnowflakeId { - private static final Log logger = LogFactory.get(); - - /** - * 共64位 第一位为符号位 默认0 - * 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :6位(0-63), - * workerId(关联进程):7(0-127) ,序列号:11位(2047/ms) - * - * 序列号 /ms = (-1L ^ (-1L << 11)) - * 最大使用年 = (1L << 39) / (1000L * 60 * 60 * 24 * 365) - */ - /** - * 开始时间截 (2020-11-14 00:00:00) max 17years - */ - private final long twepoch = 1605283200000L; - - /** - * 机器id所占的位数 - */ - private final long workerIdBits = 8L; - - /** - * 数据标识id所占的位数 - */ - private final long dataCenterIdBits = 5L; - - /** - * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) - * M << n = M * 2^n - */ - private final long maxWorkerId = -1L ^ (-1L << workerIdBits); - - /** - * 支持的最大数据标识id,结果是31 - */ - private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits); - - /** - * 序列在id中占的位数 - */ - private final long sequenceBits = 11L; - - /** - * 机器ID向左移12位 - */ - private final long workerIdShift = sequenceBits; - - /** - * 数据标识id向左移17位(14+6) - */ - private final long dataCenterIdShift = sequenceBits + workerIdBits; - - /** - * 时间截向左移22位(4+6+14) - */ - private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits; - - /** - * 生成序列的掩码,这里为2047 - */ - private final long sequenceMask = -1L ^ (-1L << sequenceBits); - - /** - * 工作机器ID(0~255) - */ - private long workerId; - - /** - * 数据中心ID(0~31) - */ - private long dataCenterId; - - /** - * 毫秒内序列(0~2047) - */ - private long sequence = 0L; - - /** - * 上次生成ID的时间截 - */ - private long lastTimestamp = -1L; - - - /** - * 设置允许时间回拨的最大限制10s - */ - private static final long rollBackTime = 10000L; - - - private static SnowflakeId idWorker; - - private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); - - static { - idWorker = new SnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM); - } - - //==============================Constructors===================================== - - /** - * 构造函数 - */ - private SnowflakeId(String zookeeperIp, long dataCenterIdNum) { - DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); - try { - lock.lock(); - int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp); - if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { - throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); - } - if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) { - throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId)); - } - this.workerId = tmpWorkerId; - this.dataCenterId = dataCenterIdNum; - } catch (RuntimeException e) { - logger.error("This is not usual error!!!===>>>" + e + "<<<==="); - }finally { - lock.unlock(); - } - } - - // ==============================Methods========================================== - - /** - * 获得下一个ID (该方法是线程安全的) - * - * @return SnowflakeId - */ - private synchronized long nextId() { - long timestamp = timeGen(); - //设置一个允许回拨限制时间,系统时间回拨范围在rollBackTime内可以等待校准 - if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) { - timestamp = tilNextMillis(lastTimestamp); - } - //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 - if (timestamp < lastTimestamp) { - throw new RuntimeException( - String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); - } - - //如果是同一时间生成的,则进行毫秒内序列 - if (lastTimestamp == timestamp) { - sequence = (sequence + 1) & sequenceMask; - //毫秒内序列溢出 - if (sequence == 0) { - //阻塞到下一个毫秒,获得新的时间戳 - timestamp = tilNextMillis(lastTimestamp); - } - } - //时间戳改变,毫秒内序列重置 - else { - sequence = 0L; - } - - //上次生成ID的时间截 - lastTimestamp = timestamp; - - //移位并通过或运算拼到一起组成64位的ID - return ((timestamp - twepoch) << timestampLeftShift) - | (dataCenterId << dataCenterIdShift) - | (workerId << workerIdShift) - | sequence; - } - - /** - * 阻塞到下一个毫秒,直到获得新的时间戳 - * - * @param lastTimestamp 上次生成ID的时间截 - * @return 当前时间戳 - */ - protected long tilNextMillis(long lastTimestamp) { - long timestamp = timeGen(); - while (timestamp <= lastTimestamp) { - timestamp = timeGen(); - } - return timestamp; - } - - /** - * 返回以毫秒为单位的当前时间 - * - * @return 当前时间(毫秒) - */ - protected long timeGen() { - return System.currentTimeMillis(); - } - - - /** - * 静态工具类 - * - * @return - */ - public static Long generateId() { - return idWorker.nextId(); - } - - -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java index 42b7639..35312fc 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java @@ -65,9 +65,9 @@ public class TransFormMap { JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); } break; - case "snowflake_id": - JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); - break; +// case "snowflake_id": +// JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); +// break; case "geo_ip_detail": if (logValue != null && appendTo == null) { JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java index 9e92576..f13894e 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java @@ -65,11 +65,11 @@ public class TransFormTypeMap { JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); } break; - case "snowflake_id": - JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); - //版本规划暂不实现TSG-22.01 -// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId())); - break; +// case "snowflake_id": +// JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); +// //版本规划暂不实现TSG-22.01 +//// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId())); +// break; case "geo_ip_detail": if (logValue != null && appendToKeyValue == null) { JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java deleted file mode 100644 index 1adb1d1..0000000 --- a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.zdjizhi.utils.http; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.commons.io.IOUtils; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -/** - * 获取网关schema的工具类 - * - * @author qidaijie - */ -public class HttpClientUtil { - private static final Log logger = LogFactory.get(); - - /** - * 请求网关获取schema - * - * @param http 网关url - * @return schema - */ - public static String requestByGetMethod(String http) { - CloseableHttpClient httpClient = HttpClients.createDefault(); - StringBuilder entityStringBuilder; - - HttpGet get = new HttpGet(http); - BufferedReader bufferedReader = null; - CloseableHttpResponse httpResponse = null; - try { - httpResponse = httpClient.execute(get); - HttpEntity entity = httpResponse.getEntity(); - entityStringBuilder = new StringBuilder(); - if (null != entity) { - bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); - int intC; - while ((intC = bufferedReader.read()) != -1) { - char c = (char) intC; - if (c == '\n') { - break; - } - entityStringBuilder.append(c); - } - - return entityStringBuilder.toString(); - } - } catch (IOException e) { - logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); - } finally { - if (httpClient != null) { - try { - httpClient.close(); - } catch (IOException e) { - logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); - } - } - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - logger.error("Close httpResponse ERROR! Exception messgae is:" + e); - } - } - if (bufferedReader != null) { - IOUtils.closeQuietly(bufferedReader); - } - } - return ""; - } -} diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java index b13627f..e7ba647 100644 --- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java +++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java @@ -1,11 +1,15 @@ package com.zdjizhi.utils.json; +import cn.hutool.core.convert.Convert; +import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.exception.FlowWriteException; +import java.util.concurrent.TimeUnit; + /** * @author qidaijie * @Package PACKAGE_NAME @@ -168,4 +172,17 @@ public class TypeUtils { throw new FlowWriteException("can not cast to long, value : " + value); } + public static long coverMSToS(long ms) { + if (ms > 9_999_999_999L) { + return Convert.convertTime(ms, TimeUnit.MILLISECONDS, TimeUnit.SECONDS); + } + return ms; + } + + public static Object coverMSToS(Object ms) { + if (StrUtil.toString(ms).length() == 13) { + return StrUtil.sub(StrUtil.toString(ms), 0, 10); + } + return ms; + } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index cf27cc3..0b98d0e 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -46,5 +46,4 @@ public class KafkaProducer { return kafkaProducer; } - } diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java deleted file mode 100644 index 2afab03..0000000 --- a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java +++ /dev/null @@ -1,190 +0,0 @@ -package com.zdjizhi.utils.zookeeper; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -/** - * @author qidaijie - */ -public class DistributedLock implements Lock, Watcher { - private static final Log logger = LogFactory.get(); - - private ZooKeeper zk = null; - /** - * 根节点 - */ - private final String ROOT_LOCK = "/locks"; - /** - * 竞争的资源 - */ - private String lockName; - /** - * 等待的前一个锁 - */ - private String waitLock; - /** - * 当前锁 - */ - private String currentLock; - /** - * 计数器 - */ - private CountDownLatch countDownLatch; - - private int sessionTimeout = 2000; - - private List exceptionList = new ArrayList(); - - /** - * 配置分布式锁 - * - * @param config 连接的url - * @param lockName 竞争资源 - */ - public DistributedLock(String config, String lockName) { - this.lockName = lockName; - try { - // 连接zookeeper - zk = new ZooKeeper(config, sessionTimeout, this); - Stat stat = zk.exists(ROOT_LOCK, false); - if (stat == null) { - // 如果根节点不存在,则创建根节点 - zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } catch (IOException | InterruptedException | KeeperException e) { - logger.error("Node already exists!"); - } - } - - // 节点监视器 - @Override - public void process(WatchedEvent event) { - if (this.countDownLatch != null) { - this.countDownLatch.countDown(); - } - } - - @Override - public void lock() { - if (exceptionList.size() > 0) { - throw new LockException(exceptionList.get(0)); - } - try { - if (this.tryLock()) { - logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁"); - } else { - // 等待锁 - waitForLock(waitLock, sessionTimeout); - } - } catch (InterruptedException | KeeperException e) { - logger.error("获取锁异常" + e); - } - } - - @Override - public boolean tryLock() { - try { - String splitStr = "_lock_"; - if (lockName.contains(splitStr)) { - throw new LockException("锁名有误"); - } - // 创建临时有序节点 - currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); - // 取所有子节点 - List subNodes = zk.getChildren(ROOT_LOCK, false); - // 取出所有lockName的锁 - List lockObjects = new ArrayList(); - for (String node : subNodes) { - String tmpNode = node.split(splitStr)[0]; - if (tmpNode.equals(lockName)) { - lockObjects.add(node); - } - } - Collections.sort(lockObjects); - // 若当前节点为最小节点,则获取锁成功 - if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) { - return true; - } - // 若不是最小节点,则找到自己的前一个节点 - String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1); - waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1); - } catch (InterruptedException | KeeperException e) { - logger.error("获取锁过程异常" + e); - } - return false; - } - - - @Override - public boolean tryLock(long timeout, TimeUnit unit) { - try { - if (this.tryLock()) { - return true; - } - return waitForLock(waitLock, timeout); - } catch (KeeperException | InterruptedException | RuntimeException e) { - logger.error("判断是否锁定异常" + e); - } - return false; - } - - // 等待锁 - private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException { - Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true); - - if (stat != null) { - this.countDownLatch = new CountDownLatch(1); - // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁 - this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); - this.countDownLatch = null; - } - return true; - } - - @Override - public void unlock() { - try { - zk.delete(currentLock, -1); - currentLock = null; - zk.close(); - } catch (InterruptedException | KeeperException e) { - logger.error("关闭锁异常" + e); - } - } - - @Override - public Condition newCondition() { - return null; - } - - @Override - public void lockInterruptibly() throws InterruptedException { - this.lock(); - } - - - public class LockException extends RuntimeException { - private static final long serialVersionUID = 1L; - - public LockException(String e) { - super(e); - } - - public LockException(Exception e) { - super(e); - } - } - -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java deleted file mode 100644 index 9efbd46..0000000 --- a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java +++ /dev/null @@ -1,140 +0,0 @@ -package com.zdjizhi.utils.zookeeper; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -/** - * @author qidaijie - * @Package cn.ac.iie.utils.zookeeper - * @Description: - * @date 2020/11/1411:28 - */ -public class ZookeeperUtils implements Watcher { - private static final Log logger = LogFactory.get(); - private static final int ID_MAX = 255; - - private ZooKeeper zookeeper; - - private static final int SESSION_TIME_OUT = 20000; - - private CountDownLatch countDownLatch = new CountDownLatch(1); - - @Override - public void process(WatchedEvent event) { - if (event.getState() == Event.KeeperState.SyncConnected) { - countDownLatch.countDown(); - } - } - - - /** - * 修改节点信息 - * - * @param path 节点路径 - */ - public int modifyNode(String path, String zookeeperIp) { - createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp); - int workerId = 0; - try { - connectZookeeper(zookeeperIp); - Stat stat = zookeeper.exists(path, true); - workerId = Integer.parseInt(getNodeDate(path)); - if (workerId > ID_MAX) { - workerId = 0; - zookeeper.setData(path, "1".getBytes(), stat.getVersion()); - } else { - String result = String.valueOf(workerId + 1); - if (stat != null) { - zookeeper.setData(path, result.getBytes(), stat.getVersion()); - } else { - logger.error("Node does not exist!,Can't modify"); - } - } - } catch (KeeperException | InterruptedException e) { - logger.error("modify error Can't modify," + e); - } finally { - closeConn(); - } - logger.warn("workerID is:" + workerId); - return workerId; - } - - /** - * 连接zookeeper - * - * @param host 地址 - */ - public void connectZookeeper(String host) { - try { - zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); - countDownLatch.await(); - } catch (IOException | InterruptedException e) { - logger.error("Connection to the Zookeeper Exception! message:" + e); - } - } - - /** - * 关闭连接 - */ - public void closeConn() { - try { - if (zookeeper != null) { - zookeeper.close(); - } - } catch (InterruptedException e) { - logger.error("Close the Zookeeper connection Exception! message:" + e); - } - } - - /** - * 获取节点内容 - * - * @param path 节点路径 - * @return 内容/异常null - */ - public String getNodeDate(String path) { - String result = null; - Stat stat = new Stat(); - try { - byte[] resByte = zookeeper.getData(path, true, stat); - - result = StrUtil.str(resByte, "UTF-8"); - } catch (KeeperException | InterruptedException e) { - logger.error("Get node information exception" + e); - } - return result; - } - - /** - * @param path 节点创建的路径 - * @param date 节点所存储的数据的byte[] - * @param acls 控制权限策略 - */ - public void createNode(String path, byte[] date, List acls, String zookeeperIp) { - try { - connectZookeeper(zookeeperIp); - Stat exists = zookeeper.exists(path, true); - if (exists == null) { - Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true); - if (existsSnowflakeld == null) { - zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT); - } - zookeeper.create(path, date, acls, CreateMode.PERSISTENT); - } else { - logger.warn("Node already exists ! Don't need to create"); - } - } catch (KeeperException | InterruptedException e) { - logger.error(e); - } finally { - closeConn(); - } - } -}