diff --git a/src/main/java/com/zdjizhi/source/HttpSource.java b/src/main/java/com/zdjizhi/source/HttpSource.java index fffb21e..2d187d0 100644 --- a/src/main/java/com/zdjizhi/source/HttpSource.java +++ b/src/main/java/com/zdjizhi/source/HttpSource.java @@ -30,10 +30,18 @@ import java.util.concurrent.Executor; public class HttpSource extends RichHttpSourceFunction> { - private static final int TRY_TIMES = 3; private static final Logger logger = LoggerFactory.getLogger(HttpSource.class); + private static final int TRY_TIMES = 3; + private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']"; +// private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_user_defined'])].['name','sha256','format','path']"; + + private static Map knowledgeMetaCache = new HashMap<>(); + + private static HashMap knowledgeFileCache; + + private static HttpClientUtils2 httpClientUtils; //连接nacos的配置 private Properties nacosProperties; @@ -59,11 +67,9 @@ public class HttpSource extends RichHttpSourceFunction> { //是否下发,默认不发送 private boolean isSending = false; - private static HttpClientUtils2 httpClientUtils; -// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); -// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); - private static Map knowledgeMetaCache = new HashMap<>(); - private static HashMap knowledgeFileCache; + + + // private boolean isRunning = true; @@ -91,10 +97,6 @@ public class HttpSource extends RichHttpSourceFunction> { public void run(SourceContext ctx) throws Exception { // ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String format = formatter.format(new Date()); - logger.info(format + "receive config from nacos:" + config); - System.out.println(format + "receive config from nacos:" + config); if (StringUtil.isNotBlank(config)) { ArrayList metaList = JsonPath.parse(config).read(EXPR); loadKnowledge(metaList); @@ -196,7 +198,7 @@ public class HttpSource extends RichHttpSourceFunction> { if (downloadBytes.length>0){ String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes); if (sha256.equals(downloadFileSha256Hex)){ - logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, downloadBytes, sha256); + logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256); // HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, downloadBytes); knowledgeMetaCache.put(fileName,sha256); knowledgeFileCache.put(fileName, downloadBytes); diff --git a/src/main/java/com/zdjizhi/source/SingleHttpSource.java b/src/main/java/com/zdjizhi/source/SingleHttpSource.java index 8808a01..524eee3 100644 --- a/src/main/java/com/zdjizhi/source/SingleHttpSource.java +++ b/src/main/java/com/zdjizhi/source/SingleHttpSource.java @@ -49,18 +49,16 @@ public class SingleHttpSource extends RichHttpSourceFunction private static Header header; -// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); -// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']"; -// private static Map updateMap = new HashMap<>(); - private static Map knowledgeMetaCache = new HashMap<>(); + private static HashMap knowledgeFileCache; private boolean isRunning = true; //是否下发,默认不发送 private boolean isSending = false; + private static final int TRY_TIMES = 3; @@ -192,7 +190,7 @@ public class SingleHttpSource extends RichHttpSourceFunction if (downloadBytes.length>0){ String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes); if (sha256.equals(downloadFileSha256Hex)){ - logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, downloadFileSha256Hex, sha256); + logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256); knowledgeMetaCache.put(fileName, sha256); knowledgeFileCache.put(fileName, downloadBytes); updateLocalFile(fileName); @@ -224,10 +222,6 @@ public class SingleHttpSource extends RichHttpSourceFunction File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName)); outputStream = new FileOutputStream(file); IoUtil.copy(new ByteArrayInputStream(knowledgeFileCache.get(fileName)), outputStream); -// byte[] bytes = IOUtils.toByteArray(inputStream); -// knowledgeFileCache.put(fileName, bytes); -// inputStream=new ByteArrayInputStream(bytes); -// IoUtil.copy(inputStream, outputStream); } catch (IOException ioe) { logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage()); ioe.printStackTrace();