TSG-15167 LTS版本新增知识库文件校验功能
This commit is contained in:
@@ -2,6 +2,8 @@ package com.zdjizhi.source;
|
|||||||
|
|
||||||
import cn.hutool.core.io.FileUtil;
|
import cn.hutool.core.io.FileUtil;
|
||||||
import cn.hutool.core.io.IoUtil;
|
import cn.hutool.core.io.IoUtil;
|
||||||
|
import cn.hutool.core.io.file.FileReader;
|
||||||
|
import cn.hutool.crypto.digest.DigestUtil;
|
||||||
import cn.hutool.json.JSONObject;
|
import cn.hutool.json.JSONObject;
|
||||||
import com.alibaba.nacos.api.NacosFactory;
|
import com.alibaba.nacos.api.NacosFactory;
|
||||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||||
@@ -28,10 +30,18 @@ import java.util.concurrent.Executor;
|
|||||||
|
|
||||||
|
|
||||||
public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(HttpSource.class);
|
private static final Logger logger = LoggerFactory.getLogger(HttpSource.class);
|
||||||
|
|
||||||
|
private static final int TRY_TIMES = 3;
|
||||||
|
|
||||||
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
|
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
|
||||||
|
// private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_user_defined'])].['name','sha256','format','path']";
|
||||||
|
|
||||||
|
private static Map<String, String> knowledgeMetaCache = new HashMap<>();
|
||||||
|
|
||||||
|
private static HashMap<String, byte[]> knowledgeFileCache;
|
||||||
|
|
||||||
|
private static HttpClientUtils2 httpClientUtils;
|
||||||
|
|
||||||
//连接nacos的配置
|
//连接nacos的配置
|
||||||
private Properties nacosProperties;
|
private Properties nacosProperties;
|
||||||
@@ -50,11 +60,17 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
|||||||
|
|
||||||
private ConfigService configService;
|
private ConfigService configService;
|
||||||
|
|
||||||
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
|
private static Header header;
|
||||||
// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
|
|
||||||
private static Map<String, String> updateMap = new HashMap<>();
|
//运行状态cancel时置为false
|
||||||
private static HashMap<String, byte[]> knowledgeFileCache;
|
|
||||||
private boolean isRunning = true;
|
private boolean isRunning = true;
|
||||||
|
//是否下发,默认不发送
|
||||||
|
private boolean isSending = false;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// private boolean isRunning = true;
|
||||||
|
|
||||||
|
|
||||||
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) {
|
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) {
|
||||||
@@ -68,10 +84,12 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
|||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
|
httpClientUtils = new HttpClientUtils2();
|
||||||
//初始化元数据缓存
|
//初始化元数据缓存
|
||||||
updateMap = new HashMap<>(16);
|
knowledgeMetaCache = new HashMap<>(16);
|
||||||
//初始化定位库缓存
|
//初始化定位库缓存
|
||||||
knowledgeFileCache = new HashMap<>(16);
|
knowledgeFileCache = new HashMap<>(16);
|
||||||
|
header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
||||||
logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
|
logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
|
||||||
configService = NacosFactory.createConfigService(nacosProperties);
|
configService = NacosFactory.createConfigService(nacosProperties);
|
||||||
}
|
}
|
||||||
@@ -79,13 +97,12 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
|||||||
public void run(SourceContext ctx) throws Exception {
|
public void run(SourceContext ctx) throws Exception {
|
||||||
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
|
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
|
||||||
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
|
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
|
||||||
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
||||||
String format = formatter.format(new Date());
|
|
||||||
logger.info(format + "receive config from nacos:" + config);
|
|
||||||
System.out.println(format + "receive config from nacos:" + config);
|
|
||||||
if (StringUtil.isNotBlank(config)) {
|
if (StringUtil.isNotBlank(config)) {
|
||||||
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
|
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
|
||||||
loadKnowledge(metaList);
|
loadKnowledge(metaList);
|
||||||
|
if (isSending){
|
||||||
|
ctx.collect(knowledgeFileCache);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -108,13 +125,14 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
|||||||
knowledgeJson.getStr("format"));
|
knowledgeJson.getStr("format"));
|
||||||
String sha256 = knowledgeJson.getStr("sha256");
|
String sha256 = knowledgeJson.getStr("sha256");
|
||||||
String filePath = knowledgeJson.getStr("path");
|
String filePath = knowledgeJson.getStr("path");
|
||||||
if (!sha256.equals(updateMap.get(fileName))) {
|
if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
|
||||||
updateMap.put(fileName, sha256);
|
knowledgeMetaCache.put(fileName, sha256);
|
||||||
updateKnowledge(fileName, filePath);
|
updateKnowledge(fileName, filePath,sha256);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
ctx.collect(knowledgeFileCache);
|
if (isSending){
|
||||||
|
ctx.collect(knowledgeFileCache);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -126,7 +144,11 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
while (isRunning) {
|
while (isRunning) {
|
||||||
Thread.sleep(10000);
|
try {
|
||||||
|
Thread.sleep(10000);
|
||||||
|
}catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -141,38 +163,92 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
|||||||
knowledgeJson.getStr("format"));
|
knowledgeJson.getStr("format"));
|
||||||
String sha256 = knowledgeJson.getStr("sha256");
|
String sha256 = knowledgeJson.getStr("sha256");
|
||||||
String filePath = knowledgeJson.getStr("path");
|
String filePath = knowledgeJson.getStr("path");
|
||||||
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
byte[] localFileByte = getLocalFile(fileName);
|
||||||
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
|
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||||
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
if (sha256.equals(localFileSha256Hex)){
|
||||||
updateMap.put(fileName, sha256);
|
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
|
||||||
knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
|
knowledgeMetaCache.put(fileName, sha256);
|
||||||
|
// knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
|
||||||
|
knowledgeFileCache.put(fileName, localFileByte);
|
||||||
|
}else {
|
||||||
|
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
|
||||||
|
updateKnowledge(fileName,filePath,sha256);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// } catch (IOException ioException) {
|
||||||
|
} catch (Exception ioException) {
|
||||||
|
ioException.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeQuietly(inputStream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void updateKnowledge(String fileName, String filePath,String sha256) {
|
||||||
|
InputStream inputStream = null;
|
||||||
|
// FileOutputStream outputStream = null;
|
||||||
|
int retryNum = 0;
|
||||||
|
try {
|
||||||
|
while (retryNum < TRY_TIMES){
|
||||||
|
inputStream = httpClientUtils.httpGetInputStream(filePath, 90000, header);
|
||||||
|
if (inputStream !=null){
|
||||||
|
byte[] downloadBytes = IOUtils.toByteArray(inputStream);
|
||||||
|
if (downloadBytes.length>0){
|
||||||
|
String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
|
||||||
|
if (sha256.equals(downloadFileSha256Hex)){
|
||||||
|
logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
|
||||||
|
// HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, downloadBytes);
|
||||||
|
knowledgeMetaCache.put(fileName,sha256);
|
||||||
|
knowledgeFileCache.put(fileName, downloadBytes);
|
||||||
|
updateLocalFile(fileName);
|
||||||
|
retryNum = TRY_TIMES;
|
||||||
|
|
||||||
|
isSending = true;
|
||||||
|
}else {
|
||||||
|
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
|
||||||
|
retryNum++;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException ioException) {
|
} catch (IOException ioException) {
|
||||||
ioException.printStackTrace();
|
ioException.printStackTrace();
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeQuietly(inputStream);
|
IOUtils.closeQuietly(inputStream);
|
||||||
|
// IOUtils.closeQuietly(outputStream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void updateKnowledge(String fileName, String filePath) {
|
private void updateLocalFile(String fileName) {
|
||||||
InputStream inputStream = null;
|
|
||||||
FileOutputStream outputStream = null;
|
FileOutputStream outputStream = null;
|
||||||
try {
|
try {
|
||||||
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, knowledgeFileCache.get(fileName));
|
||||||
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
|
} catch (IOException ioe) {
|
||||||
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
|
||||||
byte[] bytes = IOUtils.toByteArray(inputStream);
|
ioe.printStackTrace();
|
||||||
HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, bytes);
|
} catch (RuntimeException e) {
|
||||||
knowledgeFileCache.put(fileName, bytes);
|
logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage());
|
||||||
} catch (IOException ioException) {
|
e.printStackTrace();
|
||||||
ioException.printStackTrace();
|
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeQuietly(inputStream);
|
|
||||||
IOUtils.closeQuietly(outputStream);
|
IOUtils.closeQuietly(outputStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static byte[] getLocalFile(String name) {
|
||||||
|
byte[] fileBytes = null;
|
||||||
|
try {
|
||||||
|
fileBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) ;
|
||||||
|
} catch (RuntimeException | IOException e) {
|
||||||
|
logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage());
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return fileBytes;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancel() {
|
public void cancel() {
|
||||||
this.isRunning = false;
|
this.isRunning = false;
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package com.zdjizhi.source;
|
|||||||
|
|
||||||
import cn.hutool.core.io.FileUtil;
|
import cn.hutool.core.io.FileUtil;
|
||||||
import cn.hutool.core.io.IoUtil;
|
import cn.hutool.core.io.IoUtil;
|
||||||
|
import cn.hutool.core.io.file.FileReader;
|
||||||
|
import cn.hutool.crypto.digest.DigestUtil;
|
||||||
import cn.hutool.json.JSONObject;
|
import cn.hutool.json.JSONObject;
|
||||||
import com.alibaba.nacos.api.NacosFactory;
|
import com.alibaba.nacos.api.NacosFactory;
|
||||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||||
@@ -23,13 +25,13 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(SingleHttpSource.class);
|
private static final Logger logger = LoggerFactory.getLogger(SingleHttpSource.class);
|
||||||
private static HashMap<String, byte[]> knowledgeFileCache;
|
|
||||||
|
|
||||||
private Properties nacosProperties;
|
private Properties nacosProperties;
|
||||||
|
|
||||||
@@ -41,16 +43,23 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
|
|||||||
|
|
||||||
private static String STORE_PATH;
|
private static String STORE_PATH;
|
||||||
|
|
||||||
|
private static HttpClientUtils2 httpClientUtils ;
|
||||||
|
|
||||||
private ConfigService configService;
|
private ConfigService configService;
|
||||||
|
|
||||||
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
|
private static Header header;
|
||||||
// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
|
|
||||||
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
|
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
|
||||||
|
|
||||||
|
|
||||||
private static Map<String, String> updateMap = new HashMap<>();
|
private static Map<String, String> knowledgeMetaCache = new HashMap<>();
|
||||||
|
|
||||||
|
private static HashMap<String, byte[]> knowledgeFileCache;
|
||||||
private boolean isRunning = true;
|
private boolean isRunning = true;
|
||||||
|
//是否下发,默认不发送
|
||||||
|
private boolean isSending = false;
|
||||||
|
|
||||||
|
private static final int TRY_TIMES = 3;
|
||||||
|
|
||||||
|
|
||||||
public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
|
public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
|
||||||
@@ -65,33 +74,33 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
|
|||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
|
httpClientUtils = new HttpClientUtils2();
|
||||||
logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
|
logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
|
||||||
configService = NacosFactory.createConfigService(nacosProperties);
|
configService = NacosFactory.createConfigService(nacosProperties);
|
||||||
//初始化元数据缓存
|
//初始化元数据缓存
|
||||||
updateMap = new HashMap<>(16);
|
knowledgeMetaCache = new HashMap<>(16);
|
||||||
//初始化定位库缓存
|
//初始化定位库缓存
|
||||||
knowledgeFileCache = new HashMap<>(16);
|
knowledgeFileCache = new HashMap<>(16);
|
||||||
|
|
||||||
|
header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(SourceContext ctx) throws Exception {
|
public void run(SourceContext ctx) throws Exception {
|
||||||
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
|
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
|
||||||
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
|
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
|
||||||
// List<CustomFile> customFiles = new ArrayList<>();
|
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||||
|
String format = formatter.format(new Date());
|
||||||
|
logger.info(format + "receive config from nacos:" + config);
|
||||||
|
System.out.println(format + "receive config from nacos:" + config);
|
||||||
if (StringUtil.isNotBlank(config)) {
|
if (StringUtil.isNotBlank(config)) {
|
||||||
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
|
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
|
||||||
loadKnowledge(metaList);
|
loadKnowledge(metaList);
|
||||||
|
if (isSending){
|
||||||
|
ctx.collect(knowledgeFileCache);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// if (StringUtil.isNotBlank(config)) {
|
|
||||||
// List<KnowledgeLog> knowledgeLogListList = jsonMapperInstance.fromJson(config, listType);
|
|
||||||
// if (knowledgeLogListList.size()>=1){
|
|
||||||
// for (KnowledgeLog knowledgeLog : knowledgeLogListList) {
|
|
||||||
// String name = knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat());
|
|
||||||
// String sha256 = knowledgeLog.getSha256();
|
|
||||||
// updateMap.put(name,sha256);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
|
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
|
||||||
@Override
|
@Override
|
||||||
@@ -112,13 +121,14 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
|
|||||||
knowledgeJson.getStr("format"));
|
knowledgeJson.getStr("format"));
|
||||||
String sha256 = knowledgeJson.getStr("sha256");
|
String sha256 = knowledgeJson.getStr("sha256");
|
||||||
String filePath = knowledgeJson.getStr("path");
|
String filePath = knowledgeJson.getStr("path");
|
||||||
if (!sha256.equals(updateMap.get(fileName))) {
|
if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
|
||||||
updateMap.put(fileName, sha256);
|
knowledgeMetaCache.put(fileName, sha256);
|
||||||
updateKnowledge(fileName, filePath);
|
updateKnowledge(fileName, filePath,sha256);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
ctx.collect(knowledgeFileCache);
|
if (isSending){
|
||||||
|
ctx.collect(knowledgeFileCache);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,78 +147,105 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// private CustomFile loadKnowledge(String fileName, String filePath) {
|
private void loadKnowledge(ArrayList<Object> metaList) {
|
||||||
// InputStream inputStream = null;
|
// InputStream inputStream = null;
|
||||||
// FileOutputStream outputStream = null;
|
|
||||||
// CustomFile customFile = new CustomFile();
|
|
||||||
// try {
|
|
||||||
// customFile.setFileName(fileName);
|
|
||||||
// Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
|
||||||
// HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
|
|
||||||
// inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
|
||||||
// FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
|
|
||||||
// File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
|
|
||||||
// outputStream = new FileOutputStream(file);
|
|
||||||
// byte[] bytes = IOUtils.toByteArray(inputStream);
|
|
||||||
// customFile.setContent(bytes);
|
|
||||||
// inputStream = new ByteArrayInputStream(customFile.getContent());
|
|
||||||
// IoUtil.copy(inputStream, outputStream);
|
|
||||||
//
|
|
||||||
// } catch (IOException ioException) {
|
|
||||||
// ioException.printStackTrace();
|
|
||||||
// } finally {
|
|
||||||
// IOUtils.closeQuietly(inputStream);
|
|
||||||
// IOUtils.closeQuietly(outputStream);
|
|
||||||
// }
|
|
||||||
// return customFile;
|
|
||||||
// }
|
|
||||||
private void loadKnowledge(ArrayList<Object> metaList) {
|
|
||||||
InputStream inputStream = null;
|
|
||||||
try {
|
|
||||||
if (metaList.size() >= 1) {
|
|
||||||
for (Object metadata : metaList) {
|
|
||||||
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
|
|
||||||
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
|
|
||||||
knowledgeJson.getStr("format"));
|
|
||||||
String sha256 = knowledgeJson.getStr("sha256");
|
|
||||||
String filePath = knowledgeJson.getStr("path");
|
|
||||||
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
|
||||||
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
|
|
||||||
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
|
||||||
updateMap.put(fileName, sha256);
|
|
||||||
knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException ioException) {
|
|
||||||
ioException.printStackTrace();
|
|
||||||
} finally {
|
|
||||||
IOUtils.closeQuietly(inputStream);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private void updateKnowledge(String fileName, String filePath) {
|
|
||||||
InputStream inputStream = null;
|
|
||||||
FileOutputStream outputStream = null;
|
|
||||||
try {
|
try {
|
||||||
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
if (metaList.size() >= 1) {
|
||||||
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
|
for (Object metadata : metaList) {
|
||||||
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
|
||||||
FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
|
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
|
||||||
File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
|
knowledgeJson.getStr("format"));
|
||||||
outputStream = new FileOutputStream(file);
|
String sha256 = knowledgeJson.getStr("sha256");
|
||||||
byte[] bytes = IOUtils.toByteArray(inputStream);
|
String filePath = knowledgeJson.getStr("path");
|
||||||
knowledgeFileCache.put(fileName, bytes);
|
byte[] localFileByte = getLocalFile(fileName);
|
||||||
inputStream=new ByteArrayInputStream(bytes);
|
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||||
IoUtil.copy(inputStream, outputStream);
|
if (sha256.equals(localFileSha256Hex)){
|
||||||
|
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
|
||||||
|
knowledgeMetaCache.put(fileName, sha256);
|
||||||
|
knowledgeFileCache.put(fileName, localFileByte);
|
||||||
|
}else {
|
||||||
|
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
|
||||||
|
updateKnowledge(fileName,filePath,sha256);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (RuntimeException exception) {
|
||||||
|
exception.printStackTrace();
|
||||||
|
}
|
||||||
|
// finally {
|
||||||
|
// IOUtils.closeQuietly(inputStream);
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void updateKnowledge(String fileName, String filePath,String sha256) {
|
||||||
|
InputStream inputStream = null;
|
||||||
|
// FileOutputStream outputStream = null;
|
||||||
|
int retryNum = 0;
|
||||||
|
try {
|
||||||
|
while (retryNum < TRY_TIMES){
|
||||||
|
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
||||||
|
if (inputStream !=null){
|
||||||
|
byte[] downloadBytes = IOUtils.toByteArray(inputStream);
|
||||||
|
if (downloadBytes.length>0){
|
||||||
|
String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
|
||||||
|
if (sha256.equals(downloadFileSha256Hex)){
|
||||||
|
logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
|
||||||
|
knowledgeMetaCache.put(fileName, sha256);
|
||||||
|
knowledgeFileCache.put(fileName, downloadBytes);
|
||||||
|
updateLocalFile(fileName);
|
||||||
|
retryNum = TRY_TIMES;
|
||||||
|
isSending = true;
|
||||||
|
}else {
|
||||||
|
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
|
||||||
|
retryNum++;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} catch (IOException ioException) {
|
} catch (IOException ioException) {
|
||||||
ioException.printStackTrace();
|
ioException.printStackTrace();
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeQuietly(inputStream);
|
IOUtils.closeQuietly(inputStream);
|
||||||
IOUtils.closeQuietly(outputStream);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private void updateLocalFile(String fileName) {
|
||||||
|
// InputStream inputStream = null;
|
||||||
|
FileOutputStream outputStream = null;
|
||||||
|
try {
|
||||||
|
FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
|
||||||
|
File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
|
||||||
|
outputStream = new FileOutputStream(file);
|
||||||
|
IoUtil.copy(new ByteArrayInputStream(knowledgeFileCache.get(fileName)), outputStream);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
|
||||||
|
ioe.printStackTrace();
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage());
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeQuietly(outputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] getLocalFile(String name) {
|
||||||
|
byte[] fileBytes = null;
|
||||||
|
try {
|
||||||
|
fileBytes=new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes();
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage());
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return fileBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancel() {
|
public void cancel() {
|
||||||
this.isRunning = false;
|
this.isRunning = false;
|
||||||
|
|||||||
Reference in New Issue
Block a user