diff --git a/pom.xml b/pom.xml index 315fcf0..de2b189 100644 --- a/pom.xml +++ b/pom.xml @@ -5,8 +5,8 @@ 4.0.0 com.zdjizhi - log-completion-schema - 220318-Nacos + knowledge-log + 202207 log-completion-schema http://www.example.com @@ -35,10 +35,9 @@ UTF-8 1.13.1 1.0.0 - 1.2.0 1.0.8 - + @@ -230,13 +229,6 @@ test - - - com.alibaba.nacos - nacos-client - ${nacos.version} - - org.jasypt @@ -249,13 +241,8 @@ ru.yandex.clickhouse clickhouse-jdbc - 0.2.3 + 0.2.6 - - - - - org.apache.flink flink-table-planner-blink_2.12 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 2308097..91858cd 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,22 +1,22 @@ -#--------------------------------地址配置------------------------------# -#管理kafka地址 +#--------------------------------\u5730\u5740\u914D\u7F6E------------------------------# +#\u7BA1\u7406kafka\u5730\u5740 source.kafka.servers=192.168.45.102:9092 -#管理输出kafka地址 +#\u7BA1\u7406\u8F93\u51FAkafka\u5730\u5740 sink.kafka.servers=192.168.45.102:9092 -#zookeeper 地址 用于配置log_id +#zookeeper \u5730\u5740 \u7528\u4E8E\u914D\u7F6Elog_id zookeeper.servers=192.168.45.102:2181 -#hbase zookeeper地址 用于连接HBase +#hbase zookeeper\u5730\u5740 \u7528\u4E8E\u8FDE\u63A5HBase hbase.zookeeper.servers=192.168.45.102:2181 -#--------------------------------HTTP/定位库------------------------------# -#定位库地址 +#--------------------------------HTTP/\u5B9A\u4F4D\u5E93------------------------------# +#\u5B9A\u4F4D\u5E93\u5730\u5740 tools.library=D:\\workerspace\\dat\\ -#--------------------------------nacos配置------------------------------# -#nacos 地址 +#--------------------------------nacos\u914D\u7F6E------------------------------# +#nacos \u5730\u5740 nacos.server=192.168.45.102:8848 #nacos namespace @@ -25,39 +25,39 @@ nacos.schema.namespace=prod #nacos data id nacos.data.id=session_record.json -#--------------------------------Kafka消费/生产配置------------------------------# +#--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------# -#kafka 接收数据topic +#kafka \u63A5\u6536\u6570\u636Etopic source.kafka.topic=atest -#补全数据 输出 topic +#\u8865\u5168\u6570\u636E \u8F93\u51FA topic sink.kafka.topic=atest2 -#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +#\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B group.id=flinktest-102 -#--------------------------------topology配置------------------------------# +#--------------------------------topology\u914D\u7F6E------------------------------# -#consumer 并行度 +#consumer \u5E76\u884C\u5EA6 source.parallelism=1 -#转换函数并行度 +#\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6 transform.parallelism=1 -#kafka producer 并行度 +#kafka producer \u5E76\u884C\u5EA6 sink.parallelism=1 -#数据中心,取值范围(0-31) +#\u6570\u636E\u4E2D\u5FC3\uFF0C\u53D6\u503C\u8303\u56F4(0-31) data.center.id.num=0 -#hbase 更新时间,如填写0则不更新缓存 +#hbase \u66F4\u65B0\u65F6\u95F4\uFF0C\u5982\u586B\u51990\u5219\u4E0D\u66F4\u65B0\u7F13\u5B58 hbase.tick.tuple.freq.secs=180 -#--------------------------------默认值配置------------------------------# -#0不需要补全原样输出日志,1需要补全 -log.need.complete=1 +#--------------------------------\u9ED8\u8BA4\u503C\u914D\u7F6E------------------------------# +#1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7 +log.need.complete=2 -#生产者压缩模式 none or snappy +#\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy producer.kafka.compression.type=none @@ -77,29 +77,29 @@ sink.arango.table.r.mx.domain2domain=R_MX_DOMAIN2DOMAIN sink.arango.table.r.resolve.domain2ip=R_RESOLVE_DOMAIN2IP sink.arango.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN -#clickhouse 入库 +#clickhouse \u5165\u5E93 ck.hosts=192.168.45.102:8123 ck.database=tsg_galaxy_v3 ck.username=default ck.pin=galaxy2019 -ck.connection.timeout=100000 -ck.socket.timeout=1000000 +#\u5355\u4F4D\u6BEB\u79D2 +ck.connection.timeout=10000 +ck.socket.timeout=300000 #connection_record_log -flink.watermark.max.orderness=100000 -#统计时间间隔 单位s +flink.watermark.max.orderness=50 +#\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds log.aggregate.duration=5 log.aggregate.duration.graph=5 -#arangoDB参数配置 +#arangoDB\u53C2\u6570\u914D\u7F6E arangoDB.host=192.168.45.102 #arangoDB.host=192.168.40.224 arangoDB.port=8529 -arangoDB.user=upsert -arangoDB.password=galaxy2018 +arangoDB.user=root +arangoDB.password=galaxy_2019 arangoDB.DB.name=knowledge -#arangoDB.DB.name=tsg_galaxy_v3 arangoDB.batch=100000 arangoDB.ttl=3600 @@ -109,5 +109,5 @@ update.arango.batch=10000 thread.pool.number=10 thread.await.termination.time=10 -sink.batch.time.out=1 +sink.batch.time.out=5 sink.batch=10000 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/ArangoDelayProcess.java b/src/main/java/com/zdjizhi/common/ArangoDelayProcess.java deleted file mode 100644 index d39e6c5..0000000 --- a/src/main/java/com/zdjizhi/common/ArangoDelayProcess.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.zdjizhi.common; - -import com.arangodb.entity.BaseDocument; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.util.Collector; - -import java.util.List; -import java.util.Map; -import java.util.Spliterator; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -public class ArangoDelayProcess extends ProcessFunction> { - - private ValueState currentTimer; - private ListState itemState; - private String stateName; - - @Override - public void open(Configuration parameters) throws Exception { - currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(getStateName() + "_timer", Types.LONG)); - ListStateDescriptor itemViewStateDesc = new ListStateDescriptor(getStateName() + "_state", Map.class); - itemState = getRuntimeContext().getListState(itemViewStateDesc); - } - - @Override - public void processElement(BaseDocument value, Context context, Collector> collector) throws Exception { - //判断定时器是否为空,为空则创建新的定时器 - Long curTimeStamp = currentTimer.value(); - if (curTimeStamp == null || curTimeStamp == 0) { - long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000; - context.timerService().registerEventTimeTimer(onTimer); - currentTimer.update(onTimer); - } - itemState.add(value); - } - - @Override - public void onTimer(long timestamp, OnTimerContext ctx, Collector> out) throws Exception { - Spliterator spliterator = itemState.get().spliterator(); - List collect = StreamSupport.stream(spliterator, false) - .collect(Collectors.toList()); - out.collect(collect); - currentTimer.clear(); - itemState.clear(); - } - - public ArangoDelayProcess(String stateName) { - this.stateName = stateName; - } - - public String getStateName() { - return stateName; - } - - public void setStateName(String stateName) { - this.stateName = stateName; - } -} diff --git a/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java b/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java new file mode 100644 index 0000000..373eef5 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java @@ -0,0 +1,36 @@ +package com.zdjizhi.common; + +import cn.hutool.core.util.StrUtil; +import com.arangodb.entity.BaseEdgeDocument; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class ArangodbDnsWindow implements AllWindowFunction, List, TimeWindow> { + + @Override + public void apply(TimeWindow timeWindow, Iterable> iterable, Collector> out) throws Exception { + Iterator> iterator = iterable.iterator(); + List batchLog = new ArrayList<>(); + while (iterator.hasNext()) { + Map next = iterator.next(); + String qname = StrUtil.toString(next.get("qname")); + String record = StrUtil.toString(next.get("record")); + BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); + baseEdgeDocument.setKey(String.join("-", qname, record)); + baseEdgeDocument.setFrom("qname/" + qname); + baseEdgeDocument.setTo("record/" + record); + baseEdgeDocument.addAttribute("qname", qname); + baseEdgeDocument.addAttribute("record", record); + baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time")); + + batchLog.add(baseEdgeDocument); + } + out.collect(batchLog); + } +} diff --git a/src/main/java/com/zdjizhi/common/ArangodbIPWindow.java b/src/main/java/com/zdjizhi/common/ArangodbIPWindow.java new file mode 100644 index 0000000..f91fb08 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/ArangodbIPWindow.java @@ -0,0 +1,36 @@ +package com.zdjizhi.common; + +import cn.hutool.core.util.StrUtil; +import com.arangodb.entity.BaseEdgeDocument; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class ArangodbIPWindow implements AllWindowFunction, List, TimeWindow> { + + @Override + public void apply(TimeWindow timeWindow, Iterable> iterable, Collector> collector) throws Exception { + Iterator> iterator = iterable.iterator(); + List batchLog = new ArrayList<>(); + while (iterator.hasNext()) { + Map next = iterator.next(); + String srcIp = StrUtil.toString(next.get("src_ip")); + String dstIp = StrUtil.toString(next.get("dst_ip")); + BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); + baseEdgeDocument.setKey(String.join("-", srcIp, dstIp)); + baseEdgeDocument.setFrom("src_ip/" + srcIp); + baseEdgeDocument.setTo("dst_ip/" + dstIp); + baseEdgeDocument.addAttribute("src_ip", srcIp); + baseEdgeDocument.addAttribute("dst_ip", dstIp); + baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time")); + + batchLog.add(baseEdgeDocument); + } + collector.collect(batchLog); + } +} diff --git a/src/main/java/com/zdjizhi/common/CKDelayProcess.java b/src/main/java/com/zdjizhi/common/CKDelayProcess.java deleted file mode 100644 index 35ec90e..0000000 --- a/src/main/java/com/zdjizhi/common/CKDelayProcess.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.zdjizhi.common; - -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.util.Collector; - -import java.util.List; -import java.util.Map; -import java.util.Spliterator; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -public class CKDelayProcess extends ProcessFunction, List>> { - - - private ValueState currentTimer; - private ListState> itemState; - private String stateName; - - @Override - public void open(Configuration parameters) throws Exception { - currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(getStateName() + "_timer", Types.LONG)); - ListStateDescriptor> itemViewStateDesc = new ListStateDescriptor(getStateName() + "_state", Map.class); - itemState = getRuntimeContext().getListState(itemViewStateDesc); - } - - @Override - public void processElement(Map value, Context context, Collector>> collector) throws Exception { - //判断定时器是否为空,为空则创建新的定时器 - Long curTimeStamp = currentTimer.value(); - if (curTimeStamp == null || curTimeStamp == 0) { - long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000; - context.timerService().registerEventTimeTimer(onTimer); - currentTimer.update(onTimer); - } - itemState.add(value); - } - - @Override - public void onTimer(long timestamp, OnTimerContext ctx, Collector>> out) throws Exception { - Spliterator> spliterator = itemState.get().spliterator(); - List> collect = StreamSupport.stream(spliterator, false) - .collect(Collectors.toList()); - out.collect(collect); - currentTimer.clear(); - itemState.clear(); - } - - public CKDelayProcess(String stateName) { - this.stateName = stateName; - } - - public String getStateName() { - return stateName; - } - - public void setStateName(String stateName) { - this.stateName = stateName; - } -} diff --git a/src/main/java/com/zdjizhi/common/CKWindow.java b/src/main/java/com/zdjizhi/common/CKWindow.java new file mode 100644 index 0000000..b7c7b8c --- /dev/null +++ b/src/main/java/com/zdjizhi/common/CKWindow.java @@ -0,0 +1,24 @@ +package com.zdjizhi.common; + +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class CKWindow implements AllWindowFunction, List>, TimeWindow> { + + @Override + public void apply(TimeWindow timeWindow, Iterable> iterable, Collector>> out) throws Exception { + Iterator> iterator = iterable.iterator(); + List> batchLog = new ArrayList<>(); + while (iterator.hasNext()) { + Map next = iterator.next(); + batchLog.add(next); + } + out.collect(batchLog); + } +} diff --git a/src/main/java/com/zdjizhi/common/ConnKeysSelector.java b/src/main/java/com/zdjizhi/common/ConnKeysSelector.java deleted file mode 100644 index 6012626..0000000 --- a/src/main/java/com/zdjizhi/common/ConnKeysSelector.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.zdjizhi.common; - - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; - -import java.util.Map; - -/** - * @description: - * @author: zhq - * @create: 2022-07-05 - **/ -@Deprecated -public class ConnKeysSelector implements KeySelector, String> { - - @Override - public String getKey(Map log) throws Exception { - return String.valueOf(log.get("conn_start_time")); - } -} diff --git a/src/main/java/com/zdjizhi/common/DnsTimeKeysSelector.java b/src/main/java/com/zdjizhi/common/DnsTimeKeysSelector.java deleted file mode 100644 index 54af74c..0000000 --- a/src/main/java/com/zdjizhi/common/DnsTimeKeysSelector.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.zdjizhi.common; - - -import org.apache.flink.api.java.functions.KeySelector; - -import java.util.Map; - -/** - * @description: - * @author: zhq - * @create: 2022-07-05 - **/ -public class DnsTimeKeysSelector implements KeySelector, String> { - - @Override - public String getKey(Map log) throws Exception { - return String.valueOf(log.get("capture_time")); - } -} diff --git a/src/main/java/com/zdjizhi/common/IpKeysSelector.java b/src/main/java/com/zdjizhi/common/IpKeysSelector.java index 9528a1b..2dc7e79 100644 --- a/src/main/java/com/zdjizhi/common/IpKeysSelector.java +++ b/src/main/java/com/zdjizhi/common/IpKeysSelector.java @@ -15,7 +15,6 @@ public class IpKeysSelector implements KeySelector, Tuple2 getKey(Map log) throws Exception { - return Tuple2.of( String.valueOf(log.get("src_ip")), String.valueOf(log.get("dst_ip"))); diff --git a/src/main/java/com/zdjizhi/common/KeysSelector.java b/src/main/java/com/zdjizhi/common/KeysSelector.java deleted file mode 100644 index bfa2083..0000000 --- a/src/main/java/com/zdjizhi/common/KeysSelector.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.zdjizhi.common; - - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; - -import java.util.Map; - -/** - * @description: - * @author: zhq - * @create: 2022-07-05 - **/ -@Deprecated -public class KeysSelector implements KeySelector, Tuple2> { - - @Override - public Tuple2 getKey(Map log) throws Exception { - - return Tuple2.of( - String.valueOf(log.get("src_ip")), - String.valueOf(log.get("dst_ip"))); - } -} diff --git a/src/main/java/com/zdjizhi/common/ListWindow.java b/src/main/java/com/zdjizhi/common/ListWindow.java deleted file mode 100644 index c5f6161..0000000 --- a/src/main/java/com/zdjizhi/common/ListWindow.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.zdjizhi.common; - -/** - * @description: - * @author: zhq - * @create: 2022-07-10 - **/ -public class ListWindow { - - - -} diff --git a/src/main/java/com/zdjizhi/common/SketchKeysSelector.java b/src/main/java/com/zdjizhi/common/SketchKeysSelector.java deleted file mode 100644 index 373a557..0000000 --- a/src/main/java/com/zdjizhi/common/SketchKeysSelector.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.zdjizhi.common; - - -import org.apache.flink.api.java.functions.KeySelector; - -import java.util.Map; - -/** - * @description: - * @author: zhq - * @create: 2022-07-05 - **/ -public class SketchKeysSelector implements KeySelector, String> { - - @Override - public String getKey(Map log) throws Exception { - return String.valueOf(log.get("sketch_start_time")); - } -} diff --git a/src/main/java/com/zdjizhi/common/StartTimeKeysSelector.java b/src/main/java/com/zdjizhi/common/StartTimeKeysSelector.java deleted file mode 100644 index 37ad5d5..0000000 --- a/src/main/java/com/zdjizhi/common/StartTimeKeysSelector.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.zdjizhi.common; - - -import org.apache.flink.api.java.functions.KeySelector; - -import java.util.Map; - -/** - * @description: - * @author: zhq - * @create: 2022-07-05 - **/ -public class StartTimeKeysSelector implements KeySelector, String> { - - @Override - public String getKey(Map log) throws Exception { - return String.valueOf(log.get("start_time")); - } -} diff --git a/src/main/java/com/zdjizhi/enums/DnsType.java b/src/main/java/com/zdjizhi/enums/DnsType.java index 2d8c985..a5c5586 100644 --- a/src/main/java/com/zdjizhi/enums/DnsType.java +++ b/src/main/java/com/zdjizhi/enums/DnsType.java @@ -9,11 +9,11 @@ import static com.zdjizhi.common.FlowWriteConfig.*; **/ public enum DnsType { //对应dns类型,编码,入库表 - A("a", " 0x0001", R_RESOLVE_DOMAIN2IP), - AAAA("aaaa", " 0x001c", R_RESOLVE_DOMAIN2IP), - CNAME("cname", " 0x0005", R_CNAME_DOMAIN2DOMAIN), - MX("mx", " 0x000f", R_MX_DOMAIN2DOMAIN), - NS("ns", " 0x0002", R_NX_DOMAIN2DOMAIN); + A("a", "0x0001", R_RESOLVE_DOMAIN2IP), + AAAA("aaaa", "0x001c", R_RESOLVE_DOMAIN2IP), + CNAME("cname", "0x0005", R_CNAME_DOMAIN2DOMAIN), + MX("mx", "0x000f", R_MX_DOMAIN2DOMAIN), + NS("ns", "0x0002", R_NX_DOMAIN2DOMAIN); private String type; private String code; diff --git a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java index 6ed9eef..49041dc 100644 --- a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java @@ -1,13 +1,13 @@ package com.zdjizhi.etl; import cn.hutool.core.convert.Convert; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.LinkedHashMap; import java.util.Map; @@ -20,7 +20,7 @@ import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; */ public class ConnProcessFunction extends ProcessWindowFunction, Map, Tuple2, TimeWindow> { - private static final Logger logger = LoggerFactory.getLogger(ConnProcessFunction.class); + private static final Log logger = LogFactory.get(); @Override public void process(Tuple2 keys, Context context, Iterable> elements, Collector> out) { diff --git a/src/main/java/com/zdjizhi/etl/DnsGraphMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsGraphMapFunction.java deleted file mode 100644 index bc9a73d..0000000 --- a/src/main/java/com/zdjizhi/etl/DnsGraphMapFunction.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.zdjizhi.etl; - -import cn.hutool.core.util.StrUtil; -import com.arangodb.entity.BaseDocument; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - - -/** - * 去重 - */ -public class DnsGraphMapFunction extends RichMapFunction, BaseDocument> { - - private static final Logger logger = LoggerFactory.getLogger(DnsGraphMapFunction.class); - - @Override - public BaseDocument map(Map map) throws Exception { - try { - BaseDocument baseDocument = new BaseDocument(); - baseDocument.setKey(String.join("-", StrUtil.toString(map.get("qname")), StrUtil.toString(map.get("record")))); - baseDocument.addAttribute("qname", map.get("qname")); - baseDocument.addAttribute("record", map.get("record")); - baseDocument.addAttribute("last_found_time", map.get("start_time")); - return baseDocument; - } catch (Exception e) { - logger.error("dns record type 类型转换错误: {}", e); - } - return null; - } -} diff --git a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java index c438a14..18d7a71 100644 --- a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java @@ -1,12 +1,12 @@ package com.zdjizhi.etl; import cn.hutool.core.convert.Convert; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.LinkedHashMap; import java.util.Map; @@ -17,15 +17,15 @@ import java.util.Map; */ public class DnsGraphProcessFunction extends ProcessWindowFunction, Map, Tuple3, TimeWindow> { - private static final Logger logger = LoggerFactory.getLogger(DnsGraphProcessFunction.class); + private static final Log logger = LogFactory.get(); @Override public void process(Tuple3 keys, Context context, Iterable> elements, Collector> out) { try { - long tmpTime = 0L; + Long tmpTime = 0L; for (Map log : elements) { - long startTime = Convert.toLong(log.get("capure_time")); + Long startTime = Convert.toLong(log.get("start_time")); tmpTime = startTime > tmpTime ? startTime : tmpTime; } Map newLog = new LinkedHashMap<>(); diff --git a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsMapFunction.java index d5926fe..86c4616 100644 --- a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/DnsMapFunction.java @@ -1,12 +1,13 @@ package com.zdjizhi.etl; +import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.google.common.base.Joiner; import com.zdjizhi.enums.DnsType; import org.apache.flink.api.common.functions.MapFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Map; @@ -16,7 +17,7 @@ import java.util.Map; */ public class DnsMapFunction implements MapFunction, Map> { - private static final Logger logger = LoggerFactory.getLogger(DnsMapFunction.class); + private static final Log logger = LogFactory.get(); @Override public Map map(Map rawLog) throws Exception { @@ -34,9 +35,9 @@ public class DnsMapFunction implements MapFunction, Map resMap = (Map) res; - String type = resMap.get("res_type"); - String body = resMap.get("res_body"); + Map resMap = (Map) res; + String type = StrUtil.toString(resMap.get("res_type")); + String body = StrUtil.toString(resMap.get("res_body")); if (DnsType.A.getCode().equals(type)) { dnsA = Joiner.on(",").skipNulls().join(dnsA, body); dnsANum++; @@ -54,6 +55,7 @@ public class DnsMapFunction implements MapFunction, Map, Map, String, TimeWindow> { - - private static final Logger logger = LoggerFactory.getLogger(DnsProcessFunction.class); - - /** - * 拆分dns_record - * 五种:a/aaaa/cname/mx/ns - * - * @param elements - * @return - */ - @Override - public void process(String keys, Context context, Iterable> elements, Collector> out) { - - try { - long startTime = System.currentTimeMillis() / 1000; - long endTime = System.currentTimeMillis() / 1000; - try { - Map distinctA = new HashMap<>(); - Map distinctAAAA = new HashMap<>(); - Map distinctCname = new HashMap<>(); - Map distinctNs = new HashMap<>(); - Map distinctMx = new HashMap<>(); - for (Map log : elements) { - List dnsA = splitDns(log, "dns_a"); - List dnsAAAA = splitDns(log, "dns_aaaa"); - List dnsCname = splitDns(log, "dns_cname"); - List dnsNs = splitDns(log, "dns_ns"); - List dnsMx = splitDns(log, "dns_mx"); - - dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum)); - dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum)); - dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum)); - dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum)); - dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum)); - - long connStartTimetime = Convert.toLong(log.get("capture_time")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; - } - getNewDns(startTime, endTime, keys, distinctA, DnsType.A.getType(), out); - getNewDns(startTime, endTime, keys, distinctAAAA, DnsType.AAAA.getType(), out); - getNewDns(startTime, endTime, keys, distinctCname, DnsType.CNAME.getType(), out); - getNewDns(startTime, endTime, keys, distinctNs, DnsType.NS.getType(), out); - getNewDns(startTime, endTime, keys, distinctMx, DnsType.MX.getType(), out); - - } catch (Exception e) { - logger.error("聚合中间结果集失败 {}", e); - } - } catch (Exception e) { - logger.error("获取中间聚合结果失败,middleResult: {}", e); - } - } - - private static List splitDns(Map log, String key) { - return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA); - } - - private void getNewDns(long startTime, long endTime, String dnsQname, Map distinctMap, String type, Collector> out) { - for (Map.Entry dns : distinctMap.entrySet()) { - Map newDns = new HashMap<>(); - newDns.put("start_time", startTime); - newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION); - newDns.put("record_type", type); - newDns.put("qname", dnsQname); - newDns.put("record", dns.getKey()); - newDns.put("sessions", dns.getValue()); - out.collect(newDns); - } - } -} diff --git a/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java index 104d2d7..04e45f8 100644 --- a/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java @@ -2,12 +2,12 @@ package com.zdjizhi.etl; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.LinkedHashMap; import java.util.Map; @@ -20,7 +20,7 @@ import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; */ public class DnsRelationProcessFunction extends ProcessWindowFunction, Map, Tuple3, TimeWindow> { - private static final Logger logger = LoggerFactory.getLogger(DnsRelationProcessFunction.class); + private static final Log logger = LogFactory.get(); /** * 拆分dns_record diff --git a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java index 725978e..6dbe35e 100644 --- a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java @@ -2,15 +2,17 @@ package com.zdjizhi.etl; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.google.common.base.Splitter; import com.zdjizhi.enums.DnsType; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** @@ -18,7 +20,7 @@ import java.util.Map; */ public class DnsSplitFlatMapFunction extends RichFlatMapFunction, Map> { - private static final Logger logger = LoggerFactory.getLogger(DnsSplitFlatMapFunction.class); + private static final Log logger = LogFactory.get(); /** * 拆分dns_record @@ -65,8 +67,10 @@ public class DnsSplitFlatMapFunction extends RichFlatMapFunction splitDns(Map log, String key) { - - return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA); + if (Objects.isNull(log.get(key))) { + return null; + } + return Splitter.on(StrUtil.COMMA).trimResults().omitEmptyStrings().splitToList(StrUtil.toString(log.get(key))); } } diff --git a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java index 0181b2d..5bce25d 100644 --- a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java @@ -2,40 +2,39 @@ package com.zdjizhi.etl; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; -import com.arangodb.entity.BaseDocument; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; /** * 对ip去重 */ -public class Ip2IpGraphProcessFunction extends ProcessWindowFunction, BaseDocument, Tuple2, TimeWindow> { +public class Ip2IpGraphProcessFunction extends ProcessWindowFunction, Map, Tuple2, TimeWindow> { - private static final Logger logger = LoggerFactory.getLogger(Ip2IpGraphProcessFunction.class); + private static final Log logger = LogFactory.get(); @Override - public void process(Tuple2 keys, Context context, Iterable> elements, Collector out) { + public void process(Tuple2 keys, Context context, Iterable> elements, Collector> out) { try { long lastFoundTime = DateUtil.currentSeconds(); for (Map log : elements) { - long connStartTimetime = Convert.toLong(log.get("conn_start_time")); + long connStartTimetime = Convert.toLong(log.get("start_time")); lastFoundTime = connStartTimetime > lastFoundTime ? connStartTimetime : lastFoundTime; } - BaseDocument baseDocument = new BaseDocument(); - baseDocument.setKey(String.join("-", keys.f0, keys.f1)); - baseDocument.addAttribute("src_ip", keys.f0); - baseDocument.addAttribute("dst_ip", keys.f1); - baseDocument.addAttribute("last_found_time", lastFoundTime); - out.collect(baseDocument); - logger.debug("获取中间聚合结果:{}", baseDocument.toString()); + Map newLog = new HashMap<>(); + newLog.put("src_ip", keys.f0); + newLog.put("dst_ip", keys.f1); + newLog.put("last_found_time", lastFoundTime); + out.collect(newLog); + logger.debug("获取中间聚合结果:{}", newLog.toString()); } catch (Exception e) { logger.error("获取中间聚合结果失败,middleResult: {}", e); diff --git a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java index 09317fe..54d53b6 100644 --- a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java @@ -1,13 +1,13 @@ package com.zdjizhi.etl; import cn.hutool.core.convert.Convert; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.LinkedHashMap; import java.util.Map; @@ -20,7 +20,7 @@ import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; */ public class SketchProcessFunction extends ProcessWindowFunction, Map, Tuple2, TimeWindow> { - private static final Logger logger = LoggerFactory.getLogger(SketchProcessFunction.class); + private static final Log logger = LogFactory.get(); @Override public void process(Tuple2 keys, Context context, Iterable> elements, Collector> out) { diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index f28d79a..0ed5052 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -4,19 +4,16 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.util.ObjectUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.arangodb.entity.BaseDocument; import com.zdjizhi.common.*; import com.zdjizhi.enums.DnsType; import com.zdjizhi.etl.*; import com.zdjizhi.utils.arangodb.ArangoDBSink; -import com.zdjizhi.utils.ck.ClickhouseSingleSink; import com.zdjizhi.utils.ck.ClickhouseSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; @@ -55,40 +52,36 @@ public class LogFlowWriteTopology { .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000)) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) .process(new ConnProcessFunction()) .filter(x -> Objects.nonNull(x)) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - connTransformStream.print(); DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) .process(new SketchProcessFunction()) .filter(Objects::nonNull) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + //入Arangodb - SingleOutputStreamOperator ip2ipGraph = connTransformStream.union(sketchTransformStream) + DataStream> ip2ipGraph = connTransformStream.union(sketchTransformStream) .keyBy(new IpKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) - .filter(Objects::nonNull) +// .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); //写入CKsink,批量处理 - connSource.addSink(new ClickhouseSingleSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); - connTransformStream.print(); - connTransformStream.addSink(new ClickhouseSingleSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); - sketchSource.keyBy(new SketchKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_SKETCH)).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); - connTransformStream.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_RELATION_CONNECTION)).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); - sketchTransformStream.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_RELATION_CONNECTION)).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); - - ip2ipGraph.keyBy("key").process(new ArangoDelayProcess(R_VISIT_IP2IP)).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP); - + connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); + sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); + sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); + //写入arangodb + ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP); } else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) { @@ -103,7 +96,7 @@ public class LogFlowWriteTopology { .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) .flatMap(new DnsSplitFlatMapFunction()) .keyBy(new DnsGraphKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new DnsRelationProcessFunction()) .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); @@ -111,30 +104,27 @@ public class LogFlowWriteTopology { //dns 原始日志 ck入库 dnsSource.filter(Objects::nonNull) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .keyBy(new DnsTimeKeysSelector()) - .process(new CKDelayProcess(SINK_CK_TABLE_DNS)) + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) .name("CKSink"); //dns 拆分后relation日志 ck入库 - dnsTransform.keyBy(new StartTimeKeysSelector()).process(new CKDelayProcess(SINK_CK_TABLE_DNS)) - .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) + dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS)) .setParallelism(SINK_PARALLELISM) .name("CKSink"); //arango 入库,按record_type分组入不同的表 DataStream> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new DnsGraphProcessFunction()) .setParallelism(SINK_PARALLELISM) .filter(Objects::nonNull); for (DnsType dnsEnum : DnsType.values()) { dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) - .keyBy(new StartTimeKeysSelector()) - .map(new DnsGraphMapFunction()) - .process(new ArangoDelayProcess(dnsEnum.getSink())) + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbDnsWindow()) .addSink(new ArangoDBSink(dnsEnum.getSink())) .setParallelism(SINK_PARALLELISM) .name("ArangodbSink"); diff --git a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java index d2306e1..caf4e79 100644 --- a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java +++ b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java @@ -1,5 +1,7 @@ package com.zdjizhi.utils.arangodb; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.arangodb.ArangoCollection; import com.arangodb.ArangoCursor; import com.arangodb.ArangoDB; @@ -11,22 +13,21 @@ import com.arangodb.model.AqlQueryOptions; import com.arangodb.model.DocumentCreateOptions; import com.arangodb.util.MapBuilder; import com.zdjizhi.common.FlowWriteConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.List; import java.util.Map; public class ArangoDBConnect { - private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class); + private static final Log logger = LogFactory.get(); private static ArangoDB arangoDB = null; private static ArangoDBConnect conn = null; + static { getArangoDB(); } - private static void getArangoDB(){ + private static void getArangoDB() { arangoDB = new ArangoDB.Builder() .maxConnections(FlowWriteConfig.THREAD_POOL_NUMBER) .host(FlowWriteConfig.ARANGODB_HOST, FlowWriteConfig.ARANGODB_PORT) @@ -35,59 +36,59 @@ public class ArangoDBConnect { .build(); } - public static synchronized ArangoDBConnect getInstance(){ - if (null == conn){ + public static synchronized ArangoDBConnect getInstance() { + if (null == conn) { conn = new ArangoDBConnect(); } return conn; } - private ArangoDatabase getDatabase(){ + private ArangoDatabase getDatabase() { return arangoDB.db(FlowWriteConfig.ARANGODB_DB_NAME); } - public void clean(){ + public void clean() { try { - if (arangoDB != null){ + if (arangoDB != null) { arangoDB.shutdown(); } - }catch (Exception e){ - LOG.error(e.getMessage()); + } catch (Exception e) { + logger.error(e.getMessage()); } } - public ArangoCursor executorQuery(String query,Class type){ + public ArangoCursor executorQuery(String query, Class type) { ArangoDatabase database = getDatabase(); Map bindVars = new MapBuilder().get(); AqlQueryOptions options = new AqlQueryOptions() .ttl(FlowWriteConfig.ARANGODB_TTL); try { return database.query(query, bindVars, options, type); - }catch (Exception e){ - LOG.error(e.getMessage()); + } catch (Exception e) { + logger.error(e.getMessage()); return null; - }finally { + } finally { bindVars.clear(); } } - public void overwrite(List docOverwrite, String collectionName){ + public void overwrite(List docOverwrite, String collectionName) { ArangoDatabase database = getDatabase(); try { ArangoCollection collection = database.collection(collectionName); - if (!docOverwrite.isEmpty()){ + if (!docOverwrite.isEmpty()) { DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions(); documentCreateOptions.overwrite(true); documentCreateOptions.silent(true); MultiDocumentEntity> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions); Collection errors = documentCreateEntityMultiDocumentEntity.getErrors(); - for (ErrorEntity errorEntity:errors){ - LOG.error("写入arangoDB异常:"+errorEntity.getErrorMessage()); + for (ErrorEntity errorEntity : errors) { + logger.error("写入arangoDB异常:{}", errorEntity.getErrorMessage()); } } - }catch (Exception e){ - LOG.error("更新失败:"+e.toString()); - }finally { + } catch (Exception e) { + logger.error("更新失败:" + e.toString()); + } finally { docOverwrite.clear(); } } diff --git a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java index a14ce16..d2306b7 100644 --- a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java +++ b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java @@ -1,6 +1,6 @@ package com.zdjizhi.utils.arangodb; -import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -11,14 +11,15 @@ import java.util.List; * @author: zhq * @create: 2022-07-07 **/ -public class ArangoDBSink extends RichSinkFunction> { +public class ArangoDBSink extends RichSinkFunction> { private static ArangoDBConnect arangoDBConnect; private String collection; @Override - public void invoke(List baseDocuments, Context context) throws Exception { - arangoDBConnect.overwrite(baseDocuments, getCollection()); + public void invoke(List BaseEdgeDocuments, Context context) throws Exception { + + arangoDBConnect.overwrite(BaseEdgeDocuments, getCollection()); } @Override @@ -44,4 +45,5 @@ public class ArangoDBSink extends RichSinkFunction> { public void setCollection(String collection) { this.collection = collection; } + } diff --git a/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java b/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java new file mode 100644 index 0000000..2449244 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java @@ -0,0 +1,335 @@ +/* +package com.zdjizhi.utils.ck; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.yandex.clickhouse.ClickHouseConnection; +import ru.yandex.clickhouse.ClickHouseDriver; +import ru.yandex.clickhouse.ClickhouseJdbcUrlParser; +import ru.yandex.clickhouse.settings.ClickHouseProperties; + +import javax.sql.DataSource; +import java.io.PrintWriter; +import java.net.URISyntaxException; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +*/ +/** + * 提供负载均衡能力的datasource实现 + *//* + +public class BalancedClickhouseDataSource implements DataSource { + private static final Logger log = LoggerFactory.getLogger(BalancedClickhouseDataSource.class); + private static final Pattern URL_TEMPLATE = Pattern.compile("jdbc:clickhouse://([a-zA-Z0-9_:,.-]+)(/[a-zA-Z0-9_]+([?][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+([&][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+)*)?)?"); + private PrintWriter printWriter; + private int loginTimeoutSeconds; + //随机数 + private final ThreadLocal randomThreadLocal; + //所有的url + private final List allUrls; + //可用的url + private volatile List enabledUrls; + private final ClickHouseProperties properties; + private final ClickHouseDriver driver; + + public BalancedClickhouseDataSource(String url) { + this(splitUrl(url), getFromUrl(url)); + } + + public BalancedClickhouseDataSource(String url, Properties properties) { + this(splitUrl(url), new ClickHouseProperties(properties)); + } + + public BalancedClickhouseDataSource(String url, ClickHouseProperties properties) { + this(splitUrl(url), properties.merge(getFromUrlWithoutDefault(url))); + } + + private BalancedClickhouseDataSource(List urls) { + this(urls, new ClickHouseProperties()); + } + + private BalancedClickhouseDataSource(List urls, Properties info) { + this(urls, new ClickHouseProperties(info)); + } + + private BalancedClickhouseDataSource(List urls, ClickHouseProperties properties) { + this.loginTimeoutSeconds = 0; + this.randomThreadLocal = new ThreadLocal(); + this.driver = new ClickHouseDriver(); + if (urls.isEmpty()) { + throw new IllegalArgumentException("Incorrect ClickHouse jdbc url list. It must be not empty"); + } else { + try { + //解析配置文件 + ClickHouseProperties localProperties = ClickhouseJdbcUrlParser.parse((String)urls.get(0), properties.asProperties()); + localProperties.setHost((String)null); + localProperties.setPort(-1); + this.properties = localProperties; + } catch (URISyntaxException var8) { + throw new IllegalArgumentException(var8); + } + + List allUrls = new ArrayList(urls.size()); + Iterator var4 = urls.iterator(); + + while(var4.hasNext()) { + String url = (String)var4.next(); + + try { + //如果合法url + if (this.driver.acceptsURL(url)) { + //添加到所有的url列表 + allUrls.add(url); + } else { + log.error("that url is has not correct format: {}", url); + } + } catch (SQLException var7) { + throw new IllegalArgumentException("error while checking url: " + url, var7); + } + } + + if (allUrls.isEmpty()) { + throw new IllegalArgumentException("there are no correct urls"); + } else { + //所有url + this.allUrls = Collections.unmodifiableList(allUrls); + //可用url + this.enabledUrls = this.allUrls; + } + } + } + + */ +/** + * 切割url + * @param url + * @return + *//* + + static List splitUrl(String url) { + //校验url合法性 + Matcher m = URL_TEMPLATE.matcher(url); + if (!m.matches()) { + throw new IllegalArgumentException("Incorrect url"); + } else { + String database = m.group(2); + if (database == null) { + database = ""; + } + + //切割url串 + String[] hosts = m.group(1).split(","); + List result = new ArrayList(hosts.length); + String[] var5 = hosts; + int var6 = hosts.length; + + //遍历,添加切割后的url + for(int var7 = 0; var7 < var6; ++var7) { + String host = var5[var7]; + result.add("jdbc:clickhouse://" + host + database); + } + + return result; + } + } + + */ +/** + * ping url看是否可用 + * @param url + * @return + *//* + + private boolean ping(String url) { + try { + //执行简单sql测试url链接可用性 + this.driver.connect(url, this.properties).createStatement().execute("SELECT 1"); + return true; + } catch (Exception var3) { + return false; + } + } + + */ +/** + * 遍历所有url,通过ping的方式,选择出可用的url + * @return + *//* + + public synchronized int actualize() { + //新建可用url列表 + List enabledUrls = new ArrayList(this.allUrls.size()); + Iterator var2 = this.allUrls.iterator(); + + while(var2.hasNext()) { + String url = (String)var2.next(); + log.debug("Pinging disabled url: {}", url); + if (this.ping(url)) { + log.debug("Url is alive now: {}", url); + //ping通的才添加进可用的 + enabledUrls.add(url); + } else { + log.debug("Url is dead now: {}", url); + } + } + + //重置可用url列表 + this.enabledUrls = Collections.unmodifiableList(enabledUrls); + return enabledUrls.size(); + } + + */ +/** + * 随机获取可用url返回 + * @return + * @throws java.sql.SQLException + *//* + + private String getAnyUrl() throws SQLException { + //可用url列表 + List localEnabledUrls = this.enabledUrls; + if (localEnabledUrls.isEmpty()) { + throw new SQLException("Unable to get connection: there are no enabled urls"); + } else { + Random random = (Random)this.randomThreadLocal.get(); + if (random == null) { + this.randomThreadLocal.set(new Random()); + //产生一个随机数 + random = (Random)this.randomThreadLocal.get(); + } + + int index = random.nextInt(localEnabledUrls.size()); + //用随机数选择一个可用的url返回 + return (String)localEnabledUrls.get(index); + } + } + + public ClickHouseConnection getConnection() throws SQLException { + return this.driver.connect(this.getAnyUrl(), this.properties); + } + + public ClickHouseConnection getConnection(String username, String password) throws SQLException { + return this.driver.connect(this.getAnyUrl(), this.properties.withCredentials(username, password)); + } + + public T unwrap(Class iface) throws SQLException { + if (iface.isAssignableFrom(this.getClass())) { + return iface.cast(this); + } else { + throw new SQLException("Cannot unwrap to " + iface.getName()); + } + } + + public boolean isWrapperFor(Class iface) throws SQLException { + return iface.isAssignableFrom(this.getClass()); + } + + public PrintWriter getLogWriter() throws SQLException { + return this.printWriter; + } + + public void setLogWriter(PrintWriter printWriter) throws SQLException { + this.printWriter = printWriter; + } + + public void setLoginTimeout(int seconds) throws SQLException { + this.loginTimeoutSeconds = seconds; + } + + public int getLoginTimeout() throws SQLException { + return this.loginTimeoutSeconds; + } + + public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { + throw new SQLFeatureNotSupportedException(); + } + + */ +/** + * 定期清理无用url链接 + * @param rate + * @param timeUnit + * @return + *//* + + public BalancedClickhouseDataSource withConnectionsCleaning(int rate, TimeUnit timeUnit) { + this.driver.scheduleConnectionsCleaning(rate, timeUnit); + return this; + } + + */ +/** + * 定期确认url,通过定时任务实现,以定时更新可用url列表 + * @param delay + * @param timeUnit + * @return + *//* + + public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) { + ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() { + public void run() { + try { + BalancedClickhouseDataSource.this.actualize(); + } catch (Exception var2) { + BalancedClickhouseDataSource.log.error("Unable to actualize urls", var2); + } + + } + }, 0L, (long)delay, timeUnit); + return this; + } + + public List getAllClickhouseUrls() { + return this.allUrls; + } + + public List getEnabledClickHouseUrls() { + return this.enabledUrls; + } + + */ +/** + * 返回不可用url集合 + * 通过all 和 enable的差值来找 + * + * @return + *//* + + public List getDisabledUrls() { + List enabledUrls = this.enabledUrls; + if (!this.hasDisabledUrls()) { + return Collections.emptyList(); + } else { + List disabledUrls = new ArrayList(this.allUrls); + disabledUrls.removeAll(enabledUrls); + return disabledUrls; + } + } + + public boolean hasDisabledUrls() { + return this.allUrls.size() != this.enabledUrls.size(); + } + + public ClickHouseProperties getProperties() { + return this.properties; + } + + private static ClickHouseProperties getFromUrl(String url) { + return new ClickHouseProperties(getFromUrlWithoutDefault(url)); + } + + private static Properties getFromUrlWithoutDefault(String url) { + if (StringUtils.isBlank(url)) { + return new Properties(); + } else { + int index = url.indexOf("?"); + return index == -1 ? new Properties() : ClickhouseJdbcUrlParser.parseUriQueryPart(url.substring(index + 1), new Properties()); + } + } +}*/ diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSingleSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSingleSink.java deleted file mode 100644 index dc5cbe7..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSingleSink.java +++ /dev/null @@ -1,124 +0,0 @@ -package com.zdjizhi.utils.ck; - -import cn.hutool.core.io.IoUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -import static com.zdjizhi.common.FlowWriteConfig.*; - -public class ClickhouseSingleSink extends RichSinkFunction> { - - private static final Log log = LogFactory.get(); - - private Connection connection; - private PreparedStatement preparedStatement; - public String sink; - - - public ClickhouseSingleSink(String sink) { - this.sink = sink; - } - - public String getSink() { - return sink; - } - - public void setSink(String sink) { - this.sink = sink; - } - - @Override - public void invoke(Map logs, Context context) throws Exception { - executeInsert(logs, getSink()); - } - - @Override - public void open(Configuration parameters) throws Exception { - try { - Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); - connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); -// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props); -// connection = dataSource.getConnection(); - - log.info("get clickhouse connection success"); - } catch (ClassNotFoundException | SQLException e) { - log.error("clickhouse connection error ,{}", e); - } - } - - @Override - public void close() throws Exception { - IoUtil.close(preparedStatement); - IoUtil.close(connection); - } - - public void executeInsert(Map data, String tableName) { - - try { - List keys = new LinkedList<>(data.keySet()); - connection.setAutoCommit(false); - preparedStatement = connection.prepareStatement(preparedSql(keys, tableName)); - int count = 0; - List values = new LinkedList<>(data.values()); - for (int i = 1; i <= values.size(); i++) { - Object val = values.get(i - 1); - if (val instanceof Long) { - preparedStatement.setLong((i), Long.valueOf(StrUtil.toString(val))); - } else if (val instanceof Integer) { - preparedStatement.setInt((i), Integer.valueOf(StrUtil.toString(val))); - } else if (val instanceof Boolean) { - preparedStatement.setBoolean((i), Boolean.valueOf(StrUtil.toString(val))); - } else { - preparedStatement.setString((i), StrUtil.toString(val)); - } - } - preparedStatement.addBatch(); - count++; - //1w提交一次 - if (count % SINK_BATCH == 0) { - preparedStatement.executeBatch(); - connection.commit(); - preparedStatement.clearBatch(); - count = 0; - } - if (count > 0) { - preparedStatement.executeBatch(); - connection.commit(); - } - - } catch (Exception ex) { - log.error("ClickhouseSink插入报错", ex); - } - } - - - public static String preparedSql(List fields, String tableName) { - - String placeholders = fields.stream() - .filter(Objects::nonNull) - .map(f -> "?") - .collect(Collectors.joining(", ")); - String columns = fields.stream() - .filter(Objects::nonNull) - .collect(Collectors.joining(", ")); - String sql = StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName, - "(", columns, ") VALUES (", placeholders, ")"); - log.debug(sql); - return sql; - } - - -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java index 98afe4c..cb37500 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java @@ -6,15 +6,17 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import ru.yandex.clickhouse.settings.ClickHouseProperties; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.zdjizhi.common.FlowWriteConfig.*; @@ -23,21 +25,10 @@ public class ClickhouseSink extends RichSinkFunction>> private static final Log log = LogFactory.get(); - private static Connection connection; - private static PreparedStatement preparedStatement; + private Connection connection; + private PreparedStatement preparedStatement; public String sink; - static { - try { - Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); - connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); -// BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props); -// connection = dataSource.getConnection(); - log.info("get clickhouse connection success"); - } catch (ClassNotFoundException | SQLException e) { - log.error("clickhouse connection error ,{}", e); - } - } public ClickhouseSink(String sink) { this.sink = sink; @@ -59,16 +50,32 @@ public class ClickhouseSink extends RichSinkFunction>> @Override public void open(Configuration parameters) throws Exception { + try { +// Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); +// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); + ClickHouseProperties properties = new ClickHouseProperties(); + properties.setDatabase(CK_DATABASE); + properties.setUser(CK_USERNAME); + properties.setPassword(CK_PIN); +// properties.setKeepAliveTimeout(5); + properties.setConnectionTimeout(CK_CONNECTION_TIMEOUT); + properties.setSocketTimeout(CK_SOCKET_TIMEOUT); + + BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties); + dataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测 + connection = dataSource.getConnection(); + + log.info("get clickhouse connection success"); + } catch (SQLException e) { + log.error("clickhouse connection error ,{}", e); + } + } @Override public void close() throws Exception { - if (null != connection) { - connection.close(); - } - if (null != preparedStatement) { - preparedStatement.close(); - } + IoUtil.close(preparedStatement); + IoUtil.close(connection); } public void executeInsert(List> data, String tableName) { @@ -109,9 +116,6 @@ public class ClickhouseSink extends RichSinkFunction>> } catch (Exception ex) { log.error("ClickhouseSink插入报错", ex); - } finally { - IoUtil.close(preparedStatement); - IoUtil.close(connection); } } diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java index 9ebdeb5..975731c 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java @@ -7,7 +7,6 @@ import ru.yandex.clickhouse.BalancedClickhouseDataSource; import ru.yandex.clickhouse.settings.ClickHouseProperties; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.SQLException; import static com.zdjizhi.common.FlowWriteConfig.*; @@ -26,7 +25,7 @@ public class ClickhouseUtil { public static Connection getConnection() { try { - Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); +// Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); // connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); ClickHouseProperties properties = new ClickHouseProperties(); properties.setDatabase(CK_DATABASE); @@ -40,7 +39,7 @@ public class ClickhouseUtil { log.info("get clickhouse connection success"); return connection; - } catch (ClassNotFoundException | SQLException e) { + } catch (SQLException e) { log.error("clickhouse connection error ,{}", e); } return null; diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index ddb29ed..1274511 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -5,17 +5,11 @@ import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.StringUtil; import java.util.*; -import java.util.concurrent.Executor; /** @@ -44,40 +38,6 @@ public class JsonParseUtil { */ private static ArrayList jobList; - static { - propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); - propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE); - propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); - propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); - try { - ConfigService configService = NacosFactory.createConfigService(propNacos); - String dataId = FlowWriteConfig.NACOS_DATA_ID; - String group = FlowWriteConfig.NACOS_GROUP; - String schema = configService.getConfig(dataId, group, 5000); - if (StringUtil.isNotBlank(schema)) { - jsonFieldsMap = getFieldsFromSchema(schema); - jobList = getJobListFromHttp(schema); - } - configService.addListener(dataId, group, new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configMsg) { - if (StringUtil.isNotBlank(configMsg)) { - clearCache(); - jsonFieldsMap = getFieldsFromSchema(configMsg); - jobList = getJobListFromHttp(configMsg); - } - } - }); - } catch (NacosException e) { - logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); - } - } - /** * 模式匹配,给定一个类型字符串返回一个类类型 * diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java deleted file mode 100644 index cd7ada3..0000000 --- a/src/test/java/com/zdjizhi/json/JsonPathTest.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.zdjizhi.json; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.StringUtil; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Executor; - -/** - * @author qidaijie - * @Package com.zdjizhi.json - * @Description: - * @date 2022/3/2410:22 - */ -public class JsonPathTest { - private static final Log logger = LogFactory.get(); - - private static Properties propNacos = new Properties(); - - /** - * 获取需要删除字段的列表 - */ - private static ArrayList dropList = new ArrayList<>(); - - /** - * 在内存中加载反射类用的map - */ - private static HashMap map; - - /** - * 获取任务列表 - * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: - * (mail_subject mail_subject decode_of_base64 mail_subject_charset) - */ - private static ArrayList jobList; - - private static String schema; - - static { - propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); - propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE); - propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); - propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); - try { - ConfigService configService = NacosFactory.createConfigService(propNacos); - String dataId = FlowWriteConfig.NACOS_DATA_ID; - String group = FlowWriteConfig.NACOS_GROUP; - String config = configService.getConfig(dataId, group, 5000); - if (StringUtil.isNotBlank(config)) { - schema = config; - } - } catch (NacosException e) { - logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); - } - } - - @Test - public void parseSchemaGetFields() { - DocumentContext parse = JsonPath.parse(schema); - List fields = parse.read("$.fields[*]"); - for (Object field : fields) { - String name = JsonPath.read(field, "$.name").toString(); - String type = JsonPath.read(field, "$.type").toString(); - } - } -} diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java deleted file mode 100644 index 7745d5f..0000000 --- a/src/test/java/com/zdjizhi/nacos/NacosTest.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.zdjizhi.nacos; - -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import org.junit.Test; - -import java.io.IOException; -import java.io.StringReader; -import java.util.Properties; -import java.util.concurrent.Executor; - - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2022/3/1016:58 - */ -public class NacosTest { - - /** - * - * com.alibaba.nacos - * nacos-client - * 1.2.0 - * - */ - - private static Properties properties = new Properties(); - /** - * config data id = config name - */ - private static final String DATA_ID = "dos_detection.properties"; - /** - * config group - */ - private static final String GROUP = "Galaxy"; - - private void getProperties() { - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.67:8848"); - properties.setProperty(PropertyKeyConst.NAMESPACE, "f507879a-8b1b-4330-913e-83d4fcdc14bb"); - properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); - properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); - } - - - @Test - public void GetConfigurationTest() { - try { - getProperties(); - ConfigService configService = NacosFactory.createConfigService(properties); - String content = configService.getConfig(DATA_ID, GROUP, 5000); - Properties nacosConfigMap = new Properties(); - nacosConfigMap.load(new StringReader(content)); - System.out.println(nacosConfigMap.getProperty("source.kafka.servers")); - System.out.println(content); - } catch (NacosException | IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - - @Test - public void ListenerConfigurationTest() { - getProperties(); - try { - //first get config - ConfigService configService = NacosFactory.createConfigService(properties); - String config = configService.getConfig(DATA_ID, GROUP, 5000); - System.out.println(config); - - //start listenner - configService.addListener(DATA_ID, GROUP, new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configMsg) { - System.out.println(configMsg); - } - }); - } catch (NacosException e) { - e.printStackTrace(); - } - - //keep running,change nacos config,print new config - while (true) { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } -} diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListener.java b/src/test/java/com/zdjizhi/nacos/SchemaListener.java deleted file mode 100644 index 741b2a3..0000000 --- a/src/test/java/com/zdjizhi/nacos/SchemaListener.java +++ /dev/null @@ -1,136 +0,0 @@ -package com.zdjizhi.nacos; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.StringUtil; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.Executor; - -/** - * @author qidaijie - * @Package com.zdjizhi.nacos - * @Description: - * @date 2022/3/1714:57 - */ -public class SchemaListener { - - private static Properties properties = new Properties(); - private static ArrayList jobList; - - - static { - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.40.43:8848"); - properties.setProperty(PropertyKeyConst.NAMESPACE, "test"); - properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); - properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); - - try { - ConfigService configService = NacosFactory.createConfigService(properties); - String dataId = "session_record.json"; - String group = "Galaxy"; - jobList = getJobListFromHttp(configService.getConfig(dataId, group, 5000)); - configService.addListener(dataId, group, new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configMsg) { - jobList = getJobListFromHttp(configMsg); - } - }); - } catch (NacosException e) { - e.printStackTrace(); - } - } - - - @Test - public void dealCommonMessage() { - //keep running,change nacos config,print new config - while (true) { - try { - System.out.println(Arrays.toString(jobList.get(0))); - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - /** - * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) - * - * @return 任务列表 - */ - private static ArrayList getJobListFromHttp(String schema) { - ArrayList list = new ArrayList<>(); - - //获取fields,并转化为数组,数组的每个元素都是一个name doc type - JSONObject schemaJson = JSON.parseObject(schema); - JSONArray fields = (JSONArray) schemaJson.get("fields"); - - for (Object field : fields) { - - if (JSON.parseObject(field.toString()).containsKey("doc")) { - Object doc = JSON.parseObject(field.toString()).get("doc"); - - if (JSON.parseObject(doc.toString()).containsKey("format")) { - String name = JSON.parseObject(field.toString()).get("name").toString(); - Object format = JSON.parseObject(doc.toString()).get("format"); - JSONObject formatObject = JSON.parseObject(format.toString()); - - String functions = formatObject.get("functions").toString(); - String appendTo = null; - String params = null; - - if (formatObject.containsKey("appendTo")) { - appendTo = formatObject.get("appendTo").toString(); - } - - if (formatObject.containsKey("param")) { - params = formatObject.get("param").toString(); - } - - - if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) { - String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); - String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); - - for (int i = 0; i < functionArray.length; i++) { - list.add(new String[]{name, appendToArray[i], functionArray[i], null}); - } - - } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) { - String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); - String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); - String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER); - - for (int i = 0; i < functionArray.length; i++) { - list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]}); - - } - } else { - list.add(new String[]{name, name, functions, params}); - } - - } - } - - } - return list; - } - -}