package com.zdjizhi.source; import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.IoUtil; import cn.hutool.core.io.file.FileReader; import cn.hutool.crypto.digest.DigestUtil; import cn.hutool.json.JSONObject; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import com.fasterxml.jackson.databind.JavaType; import com.google.common.base.Joiner; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.CustomFile; import com.zdjizhi.common.KnowledgeLog; import com.zdjizhi.utils.*; import org.apache.commons.io.IOUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.http.Header; import org.apache.http.message.BasicHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.Executor; public class SingleHttpSource extends RichHttpSourceFunction> { private static final Logger logger = LoggerFactory.getLogger(SingleHttpSource.class); private Properties nacosProperties; private String NACOS_DATA_ID; private String NACOS_GROUP; private long NACOS_READ_TIMEOUT; private static String STORE_PATH; private static HttpClientUtils2 httpClientUtils ; private ConfigService configService; private static Header header; // private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); // private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']"; // private static Map updateMap = new HashMap<>(); private static Map knowledgeMetaCache = new HashMap<>(); private static HashMap knowledgeFileCache; private boolean isRunning = true; //是否下发,默认不发送 private boolean isSending = false; private static final int TRY_TIMES = 3; public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) { this.nacosProperties = nacosProperties; this.NACOS_DATA_ID = NACOS_DATA_ID; this.NACOS_GROUP = NACOS_GROUP; this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); httpClientUtils = new HttpClientUtils2(); logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); configService = NacosFactory.createConfigService(nacosProperties); //初始化元数据缓存 knowledgeMetaCache = new HashMap<>(16); //初始化定位库缓存 knowledgeFileCache = new HashMap<>(16); header = new BasicHeader("token", CommonConfig.HOS_TOKEN); } @Override public void run(SourceContext ctx) throws Exception { // ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String format = formatter.format(new Date()); logger.info(format + "receive config from nacos:" + config); System.out.println(format + "receive config from nacos:" + config); if (StringUtil.isNotBlank(config)) { ArrayList metaList = JsonPath.parse(config).read(EXPR); loadKnowledge(metaList); if (isSending){ ctx.collect(knowledgeFileCache); } } configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { @Override public Executor getExecutor() { return null; } @Override public void receiveConfigInfo(String configMsg) { try { logger.info("receive update config:" + configMsg); if (StringUtil.isNotBlank(configMsg)) { ArrayList metaList = JsonPath.parse(configMsg).read(EXPR); if (metaList.size() >= 1) { for (Object metadata : metaList) { JSONObject knowledgeJson = new JSONObject(metadata, false, true); String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), knowledgeJson.getStr("format")); String sha256 = knowledgeJson.getStr("sha256"); String filePath = knowledgeJson.getStr("path"); if (!sha256.equals(knowledgeMetaCache.get(fileName))) { knowledgeMetaCache.put(fileName, sha256); updateKnowledge(fileName, filePath,sha256); } } if (isSending){ ctx.collect(knowledgeFileCache); } } } } catch (Exception e) { logger.error("监听nacos配置失败", e); } System.out.println(configMsg); } }); while (isRunning) { Thread.sleep(10000); } } private void loadKnowledge(ArrayList metaList) { // InputStream inputStream = null; try { if (metaList.size() >= 1) { for (Object metadata : metaList) { JSONObject knowledgeJson = new JSONObject(metadata, false, true); String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), knowledgeJson.getStr("format")); String sha256 = knowledgeJson.getStr("sha256"); String filePath = knowledgeJson.getStr("path"); byte[] localFileByte = getLocalFile(fileName); String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); if (sha256.equals(localFileSha256Hex)){ logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256); knowledgeMetaCache.put(fileName, sha256); knowledgeFileCache.put(fileName, localFileByte); }else { logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256); updateKnowledge(fileName,filePath,sha256); } } } } catch (RuntimeException exception) { exception.printStackTrace(); } // finally { // IOUtils.closeQuietly(inputStream); // } } private void updateKnowledge(String fileName, String filePath,String sha256) { InputStream inputStream = null; // FileOutputStream outputStream = null; int retryNum = 0; try { while (retryNum < TRY_TIMES){ inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); if (inputStream !=null){ byte[] downloadBytes = IOUtils.toByteArray(inputStream); if (downloadBytes.length>0){ String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes); if (sha256.equals(downloadFileSha256Hex)){ logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, downloadFileSha256Hex, sha256); knowledgeMetaCache.put(fileName, sha256); knowledgeFileCache.put(fileName, downloadBytes); updateLocalFile(fileName); retryNum = TRY_TIMES; isSending = true; }else { logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum); retryNum++; } } } } } catch (IOException ioException) { ioException.printStackTrace(); } finally { IOUtils.closeQuietly(inputStream); } } private void updateLocalFile(String fileName) { // InputStream inputStream = null; FileOutputStream outputStream = null; try { FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH); File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName)); outputStream = new FileOutputStream(file); IoUtil.copy(new ByteArrayInputStream(knowledgeFileCache.get(fileName)), outputStream); // byte[] bytes = IOUtils.toByteArray(inputStream); // knowledgeFileCache.put(fileName, bytes); // inputStream=new ByteArrayInputStream(bytes); // IoUtil.copy(inputStream, outputStream); } catch (IOException ioe) { logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage()); ioe.printStackTrace(); } catch (RuntimeException e) { logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage()); e.printStackTrace(); } finally { IOUtils.closeQuietly(outputStream); } } private static byte[] getLocalFile(String name) { byte[] fileBytes = null; try { fileBytes=new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes(); } catch (RuntimeException e) { logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage()); e.printStackTrace(); } return fileBytes; } @Override public void cancel() { this.isRunning = false; } }