diff --git a/src/main/java/com/zdjizhi/source/HttpSource.java b/src/main/java/com/zdjizhi/source/HttpSource.java index 6451fc1..3080d25 100644 --- a/src/main/java/com/zdjizhi/source/HttpSource.java +++ b/src/main/java/com/zdjizhi/source/HttpSource.java @@ -2,6 +2,8 @@ package com.zdjizhi.source; import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.IoUtil; +import cn.hutool.core.io.file.FileReader; +import cn.hutool.crypto.digest.DigestUtil; import cn.hutool.json.JSONObject; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; @@ -28,10 +30,18 @@ import java.util.concurrent.Executor; public class HttpSource extends RichHttpSourceFunction> { - private static final Logger logger = LoggerFactory.getLogger(HttpSource.class); + private static final int TRY_TIMES = 3; + private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']"; +// private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_user_defined'])].['name','sha256','format','path']"; + + private static Map knowledgeMetaCache = new HashMap<>(); + + private static HashMap knowledgeFileCache; + + private static HttpClientUtils2 httpClientUtils; //连接nacos的配置 private Properties nacosProperties; @@ -50,11 +60,17 @@ public class HttpSource extends RichHttpSourceFunction> { private ConfigService configService; -// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); -// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); - private static Map updateMap = new HashMap<>(); - private static HashMap knowledgeFileCache; + private static Header header; + + //运行状态cancel时置为false private boolean isRunning = true; + //是否下发,默认不发送 + private boolean isSending = false; + + + + +// private boolean isRunning = true; public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) { @@ -68,10 +84,12 @@ public class HttpSource extends RichHttpSourceFunction> { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); + httpClientUtils = new HttpClientUtils2(); //初始化元数据缓存 - updateMap = new HashMap<>(16); + knowledgeMetaCache = new HashMap<>(16); //初始化定位库缓存 knowledgeFileCache = new HashMap<>(16); + header = new BasicHeader("token", CommonConfig.HOS_TOKEN); logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); configService = NacosFactory.createConfigService(nacosProperties); } @@ -79,13 +97,12 @@ public class HttpSource extends RichHttpSourceFunction> { public void run(SourceContext ctx) throws Exception { // ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String format = formatter.format(new Date()); - logger.info(format + "receive config from nacos:" + config); - System.out.println(format + "receive config from nacos:" + config); if (StringUtil.isNotBlank(config)) { ArrayList metaList = JsonPath.parse(config).read(EXPR); loadKnowledge(metaList); + if (isSending){ + ctx.collect(knowledgeFileCache); + } } @@ -108,13 +125,14 @@ public class HttpSource extends RichHttpSourceFunction> { knowledgeJson.getStr("format")); String sha256 = knowledgeJson.getStr("sha256"); String filePath = knowledgeJson.getStr("path"); - if (!sha256.equals(updateMap.get(fileName))) { - updateMap.put(fileName, sha256); - updateKnowledge(fileName, filePath); + if (!sha256.equals(knowledgeMetaCache.get(fileName))) { + knowledgeMetaCache.put(fileName, sha256); + updateKnowledge(fileName, filePath,sha256); } - } - ctx.collect(knowledgeFileCache); + if (isSending){ + ctx.collect(knowledgeFileCache); + } } } @@ -126,7 +144,11 @@ public class HttpSource extends RichHttpSourceFunction> { }); while (isRunning) { - Thread.sleep(10000); + try { + Thread.sleep(10000); + }catch (InterruptedException e) { + e.printStackTrace(); + } } } @@ -141,38 +163,92 @@ public class HttpSource extends RichHttpSourceFunction> { knowledgeJson.getStr("format")); String sha256 = knowledgeJson.getStr("sha256"); String filePath = knowledgeJson.getStr("path"); - Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); - HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); - inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); - updateMap.put(fileName, sha256); - knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream)); + byte[] localFileByte = getLocalFile(fileName); + String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); + if (sha256.equals(localFileSha256Hex)){ + logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256); + knowledgeMetaCache.put(fileName, sha256); +// knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream)); + knowledgeFileCache.put(fileName, localFileByte); + }else { + logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256); + updateKnowledge(fileName,filePath,sha256); + } + + } + } +// } catch (IOException ioException) { + } catch (Exception ioException) { + ioException.printStackTrace(); + } finally { + IOUtils.closeQuietly(inputStream); + } + } + + + private void updateKnowledge(String fileName, String filePath,String sha256) { + InputStream inputStream = null; +// FileOutputStream outputStream = null; + int retryNum = 0; + try { + while (retryNum < TRY_TIMES){ + inputStream = httpClientUtils.httpGetInputStream(filePath, 90000, header); + if (inputStream !=null){ + byte[] downloadBytes = IOUtils.toByteArray(inputStream); + if (downloadBytes.length>0){ + String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes); + if (sha256.equals(downloadFileSha256Hex)){ + logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256); +// HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, downloadBytes); + knowledgeMetaCache.put(fileName,sha256); + knowledgeFileCache.put(fileName, downloadBytes); + updateLocalFile(fileName); + retryNum = TRY_TIMES; + + isSending = true; + }else { + logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum); + retryNum++; + } + } } } } catch (IOException ioException) { ioException.printStackTrace(); } finally { IOUtils.closeQuietly(inputStream); +// IOUtils.closeQuietly(outputStream); } } - private void updateKnowledge(String fileName, String filePath) { - InputStream inputStream = null; + private void updateLocalFile(String fileName) { FileOutputStream outputStream = null; try { - Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); - HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); - inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); - byte[] bytes = IOUtils.toByteArray(inputStream); - HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, bytes); - knowledgeFileCache.put(fileName, bytes); - } catch (IOException ioException) { - ioException.printStackTrace(); + HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, knowledgeFileCache.get(fileName)); + } catch (IOException ioe) { + logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage()); + ioe.printStackTrace(); + } catch (RuntimeException e) { + logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage()); + e.printStackTrace(); } finally { - IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(outputStream); } + } + + private static byte[] getLocalFile(String name) { + byte[] fileBytes = null; + try { + fileBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) ; + } catch (RuntimeException | IOException e) { + logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage()); + e.printStackTrace(); + } + return fileBytes; + } + @Override public void cancel() { this.isRunning = false; diff --git a/src/main/java/com/zdjizhi/source/SingleHttpSource.java b/src/main/java/com/zdjizhi/source/SingleHttpSource.java index a72a82a..adfddb9 100644 --- a/src/main/java/com/zdjizhi/source/SingleHttpSource.java +++ b/src/main/java/com/zdjizhi/source/SingleHttpSource.java @@ -2,6 +2,8 @@ package com.zdjizhi.source; import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.IoUtil; +import cn.hutool.core.io.file.FileReader; +import cn.hutool.crypto.digest.DigestUtil; import cn.hutool.json.JSONObject; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; @@ -23,13 +25,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; +import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.Executor; public class SingleHttpSource extends RichHttpSourceFunction> { private static final Logger logger = LoggerFactory.getLogger(SingleHttpSource.class); - private static HashMap knowledgeFileCache; private Properties nacosProperties; @@ -41,16 +43,23 @@ public class SingleHttpSource extends RichHttpSourceFunction private static String STORE_PATH; + private static HttpClientUtils2 httpClientUtils ; + private ConfigService configService; -// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); -// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); + private static Header header; + private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']"; - private static Map updateMap = new HashMap<>(); + private static Map knowledgeMetaCache = new HashMap<>(); + private static HashMap knowledgeFileCache; private boolean isRunning = true; + //是否下发,默认不发送 + private boolean isSending = false; + + private static final int TRY_TIMES = 3; public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) { @@ -65,33 +74,33 @@ public class SingleHttpSource extends RichHttpSourceFunction @Override public void open(Configuration parameters) throws Exception { super.open(parameters); + httpClientUtils = new HttpClientUtils2(); logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); configService = NacosFactory.createConfigService(nacosProperties); //初始化元数据缓存 - updateMap = new HashMap<>(16); + knowledgeMetaCache = new HashMap<>(16); //初始化定位库缓存 knowledgeFileCache = new HashMap<>(16); + + header = new BasicHeader("token", CommonConfig.HOS_TOKEN); } @Override public void run(SourceContext ctx) throws Exception { // ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); -// List customFiles = new ArrayList<>(); + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String format = formatter.format(new Date()); + logger.info(format + "receive config from nacos:" + config); + System.out.println(format + "receive config from nacos:" + config); if (StringUtil.isNotBlank(config)) { ArrayList metaList = JsonPath.parse(config).read(EXPR); loadKnowledge(metaList); + if (isSending){ + ctx.collect(knowledgeFileCache); + } } -// if (StringUtil.isNotBlank(config)) { -// List knowledgeLogListList = jsonMapperInstance.fromJson(config, listType); -// if (knowledgeLogListList.size()>=1){ -// for (KnowledgeLog knowledgeLog : knowledgeLogListList) { -// String name = knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()); -// String sha256 = knowledgeLog.getSha256(); -// updateMap.put(name,sha256); -// } -// } -// } + configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { @Override @@ -112,13 +121,14 @@ public class SingleHttpSource extends RichHttpSourceFunction knowledgeJson.getStr("format")); String sha256 = knowledgeJson.getStr("sha256"); String filePath = knowledgeJson.getStr("path"); - if (!sha256.equals(updateMap.get(fileName))) { - updateMap.put(fileName, sha256); - updateKnowledge(fileName, filePath); + if (!sha256.equals(knowledgeMetaCache.get(fileName))) { + knowledgeMetaCache.put(fileName, sha256); + updateKnowledge(fileName, filePath,sha256); } - } - ctx.collect(knowledgeFileCache); + if (isSending){ + ctx.collect(knowledgeFileCache); + } } } @@ -137,78 +147,105 @@ public class SingleHttpSource extends RichHttpSourceFunction } -// private CustomFile loadKnowledge(String fileName, String filePath) { + private void loadKnowledge(ArrayList metaList) { // InputStream inputStream = null; -// FileOutputStream outputStream = null; -// CustomFile customFile = new CustomFile(); -// try { -// customFile.setFileName(fileName); -// Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); -// HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); -// inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); -// FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH); -// File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName)); -// outputStream = new FileOutputStream(file); -// byte[] bytes = IOUtils.toByteArray(inputStream); -// customFile.setContent(bytes); -// inputStream = new ByteArrayInputStream(customFile.getContent()); -// IoUtil.copy(inputStream, outputStream); -// -// } catch (IOException ioException) { -// ioException.printStackTrace(); -// } finally { -// IOUtils.closeQuietly(inputStream); -// IOUtils.closeQuietly(outputStream); -// } -// return customFile; -// } -private void loadKnowledge(ArrayList metaList) { - InputStream inputStream = null; - try { - if (metaList.size() >= 1) { - for (Object metadata : metaList) { - JSONObject knowledgeJson = new JSONObject(metadata, false, true); - String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), - knowledgeJson.getStr("format")); - String sha256 = knowledgeJson.getStr("sha256"); - String filePath = knowledgeJson.getStr("path"); - Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); - HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); - inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); - updateMap.put(fileName, sha256); - knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream)); - } - } - } catch (IOException ioException) { - ioException.printStackTrace(); - } finally { - IOUtils.closeQuietly(inputStream); - } -} - - - private void updateKnowledge(String fileName, String filePath) { - InputStream inputStream = null; - FileOutputStream outputStream = null; try { - Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); - HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); - inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); - FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH); - File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName)); - outputStream = new FileOutputStream(file); - byte[] bytes = IOUtils.toByteArray(inputStream); - knowledgeFileCache.put(fileName, bytes); - inputStream=new ByteArrayInputStream(bytes); - IoUtil.copy(inputStream, outputStream); + if (metaList.size() >= 1) { + for (Object metadata : metaList) { + JSONObject knowledgeJson = new JSONObject(metadata, false, true); + String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), + knowledgeJson.getStr("format")); + String sha256 = knowledgeJson.getStr("sha256"); + String filePath = knowledgeJson.getStr("path"); + byte[] localFileByte = getLocalFile(fileName); + String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); + if (sha256.equals(localFileSha256Hex)){ + logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256); + knowledgeMetaCache.put(fileName, sha256); + knowledgeFileCache.put(fileName, localFileByte); + }else { + logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256); + updateKnowledge(fileName,filePath,sha256); + } + } + } + } catch (RuntimeException exception) { + exception.printStackTrace(); + } +// finally { +// IOUtils.closeQuietly(inputStream); +// } + } + + + private void updateKnowledge(String fileName, String filePath,String sha256) { + InputStream inputStream = null; +// FileOutputStream outputStream = null; + int retryNum = 0; + try { + while (retryNum < TRY_TIMES){ + inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); + if (inputStream !=null){ + byte[] downloadBytes = IOUtils.toByteArray(inputStream); + if (downloadBytes.length>0){ + String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes); + if (sha256.equals(downloadFileSha256Hex)){ + logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256); + knowledgeMetaCache.put(fileName, sha256); + knowledgeFileCache.put(fileName, downloadBytes); + updateLocalFile(fileName); + retryNum = TRY_TIMES; + isSending = true; + }else { + logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum); + retryNum++; + } + + } + } + } + } catch (IOException ioException) { ioException.printStackTrace(); } finally { IOUtils.closeQuietly(inputStream); - IOUtils.closeQuietly(outputStream); } } + + + private void updateLocalFile(String fileName) { +// InputStream inputStream = null; + FileOutputStream outputStream = null; + try { + FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH); + File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName)); + outputStream = new FileOutputStream(file); + IoUtil.copy(new ByteArrayInputStream(knowledgeFileCache.get(fileName)), outputStream); + } catch (IOException ioe) { + logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage()); + ioe.printStackTrace(); + } catch (RuntimeException e) { + logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage()); + e.printStackTrace(); + } finally { + IOUtils.closeQuietly(outputStream); + } + + } + + private static byte[] getLocalFile(String name) { + byte[] fileBytes = null; + try { + fileBytes=new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes(); + } catch (RuntimeException e) { + logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage()); + e.printStackTrace(); + } + return fileBytes; + } + + @Override public void cancel() { this.isRunning = false;