diff --git a/pom.xml b/pom.xml
index f4d0c5e..2b6f210 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,6 +12,8 @@
1.13.1
2.1.1
2.7.1
+ 2.11
+ 2.4.0
@@ -140,6 +142,12 @@
+
+ com.jayway.jsonpath
+ json-path
+ ${jsonpath.version}
+
+
org.apache.flink
flink-connector-kafka_2.12
@@ -179,6 +187,13 @@
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${hadoop.version}
+
+
org.apache.hbase
@@ -238,7 +253,7 @@
com.zdjizhi
galaxy
- 1.0.8
+ 1.1.1
slf4j-log4j12
@@ -303,6 +318,27 @@
guava
22.0
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.2
+
+
+
+ com.jayway.jsonpath
+ json-path
+ 2.4.0
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+
+
+
diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java
index 62c079b..1b5e4ba 100644
--- a/src/main/java/com/zdjizhi/common/CommonConfig.java
+++ b/src/main/java/com/zdjizhi/common/CommonConfig.java
@@ -9,6 +9,11 @@ import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
*/
public class CommonConfig {
+ /**
+ * 定位库默认分隔符
+ */
+ public static final String LOCATION_SEPARATOR = ".";
+
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
static {
@@ -69,6 +74,24 @@ public class CommonConfig {
public static final int SASL_JAAS_CONFIG_FLAG = CommonConfigurations.getIntProperty("sasl.jaas.config.flag");
+ public static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr");
+ public static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username");
+ public static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password");
+ public static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id");
+ public static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group");
+ public static final int NACOS_READ_TIMEOUT = CommonConfigurations.getIntProperty("nacos.read.timeout");
+
+ public static final String HOS_TOKEN = CommonConfigurations.getStringProperty("hos.token");
+
+ public static final String CLUSTER_OR_SINGLE = CommonConfigurations.getStringProperty("cluster.or.single");
+
+ public static final String HDFS_URI_NS1 = CommonConfigurations.getStringProperty("hdfs.uri.nn1");
+ public static final String HDFS_URI_NS2 = CommonConfigurations.getStringProperty("hdfs.uri.nn2");
+ public static final String HDFS_PATH = CommonConfigurations.getStringProperty("hdfs.path");
+ public static final String HDFS_USER = CommonConfigurations.getStringProperty("hdfs.user");
+
+ public static final String DOWNLOAD_PATH = CommonConfigurations.getStringProperty("download.path");
+
public static void main(String[] args) {
StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
// 配置加密解密的密码/salt值
diff --git a/src/main/java/com/zdjizhi/common/CustomFile.java b/src/main/java/com/zdjizhi/common/CustomFile.java
new file mode 100644
index 0000000..701024c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/CustomFile.java
@@ -0,0 +1,26 @@
+package com.zdjizhi.common;
+
+import java.io.Serializable;
+
+public class CustomFile implements Serializable {
+
+ String fileName;
+
+ byte[] content;
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public byte[] getContent() {
+ return content;
+ }
+
+ public void setContent(byte[] content) {
+ this.content = content;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/KnowledgeLog.java b/src/main/java/com/zdjizhi/common/KnowledgeLog.java
new file mode 100644
index 0000000..d72f7df
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/KnowledgeLog.java
@@ -0,0 +1,91 @@
+package com.zdjizhi.common;
+
+public class KnowledgeLog {
+ public String id;
+ public String name;
+ public String path;
+ public Long size;
+ public String format;
+ public String sha256;
+ public String version;
+ public String updateTime;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public Long getSize() {
+ return size;
+ }
+
+ public void setSize(Long size) {
+ this.size = size;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+
+ public void setFormat(String format) {
+ this.format = format;
+ }
+
+ public String getSha256() {
+ return sha256;
+ }
+
+ public void setSha256(String sha256) {
+ this.sha256 = sha256;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getUpdateTime() {
+ return updateTime;
+ }
+
+ public void setUpdateTime(String updateTime) {
+ this.updateTime = updateTime;
+ }
+
+
+ @Override
+ public String toString() {
+ return "KnowledgeLog{" +
+ "id='" + id + '\'' +
+ ", name='" + name + '\'' +
+ ", path='" + path + '\'' +
+ ", size=" + size +
+ ", format='" + format + '\'' +
+ ", sha256='" + sha256 + '\'' +
+ ", version='" + version + '\'' +
+ ", updateTime='" + updateTime + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java
index 3eefff8..2ef2b1b 100644
--- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java
+++ b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java
@@ -1,30 +1,38 @@
package com.zdjizhi.etl;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
+import com.zdjizhi.common.CustomFile;
import com.zdjizhi.common.DosSketchLog;
+import com.zdjizhi.function.BroadcastProcessFunc;
import com.zdjizhi.source.DosSketchSource;
import com.zdjizhi.utils.FlinkEnvironmentUtils;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.*;
/**
* @author wlh
*/
public class ParseSketchLog {
-// private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
- private static final Log logger = LogFactory.get();
+ private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
@@ -35,7 +43,28 @@ public class ParseSketchLog {
}
private static SingleOutputStreamOperator flatSketchSource(){
- return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog());
+
+ DataStreamSource