package com.zdjizhi.source; import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.IoUtil; import cn.hutool.core.io.file.FileReader; import cn.hutool.crypto.digest.DigestUtil; import cn.hutool.json.JSONObject; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; import com.fasterxml.jackson.databind.JavaType; import com.google.common.base.Joiner; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.CustomFile; import com.zdjizhi.common.KnowledgeLog; import com.zdjizhi.utils.*; import org.apache.commons.io.IOUtils; import org.apache.flink.configuration.Configuration; import org.apache.http.Header; import org.apache.http.message.BasicHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.Executor; public class HttpSource extends RichHttpSourceFunction> { private static final Logger logger = LoggerFactory.getLogger(HttpSource.class); private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']"; private static Map knowledgeMetaCache = new HashMap<>(); private static HashMap knowledgeUpdateCache; private static final int TRY_TIMES = 3; private static HttpClientUtils2 httpClientUtils; //连接nacos的配置 private Properties nacosProperties; //nacos data id private String NACOS_DATA_ID; //nacos group private String NACOS_GROUP; //nacos 连接超时时间 private long NACOS_READ_TIMEOUT; private ConfigService configService; private static Header header; //运行状态cancel时置为false private boolean isRunning = true; //是否下发,默认不发送 private boolean isSending = false; public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) { this.nacosProperties = nacosProperties; this.NACOS_DATA_ID = NACOS_DATA_ID; this.NACOS_GROUP = NACOS_GROUP; this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); httpClientUtils = new HttpClientUtils2(); //初始化元数据缓存 knowledgeMetaCache = new HashMap<>(16); //初始化定位库缓存 knowledgeUpdateCache = new HashMap<>(16); header = new BasicHeader("token", CommonConfig.HOS_TOKEN); //连接nacos配置 try { configService = NacosFactory.createConfigService(nacosProperties); }catch (NacosException e){ logger.error("Get Schema config from Nacos error,The exception message is :{}", e.getMessage()); } //初始化知识库 initKnowledge(); logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); } @Override public void run(SourceContext ctx) throws Exception { if (!knowledgeUpdateCache.isEmpty()){ ctx.collect(knowledgeUpdateCache); knowledgeUpdateCache.clear(); } // } configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { @Override public Executor getExecutor() { return null; } @Override public void receiveConfigInfo(String configMsg) { try { logger.info("receive update config:" + configMsg); if (StringUtil.isNotBlank(configMsg)) { ArrayList metaList = JsonPath.parse(configMsg).read(EXPR); if (metaList.size() > 0) { for (Object metadata : metaList) { JSONObject knowledgeJson = new JSONObject(metadata, false, true); String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), knowledgeJson.getStr("format")); String sha256 = knowledgeJson.getStr("sha256"); String filePath = knowledgeJson.getStr("path"); if (!sha256.equals(knowledgeMetaCache.get(fileName))) { knowledgeMetaCache.put(fileName, sha256); updateKnowledge(fileName, filePath,sha256); } } if (!knowledgeUpdateCache.isEmpty()){ ctx.collect(knowledgeUpdateCache); knowledgeUpdateCache.clear(); } } } } catch (Exception e) { logger.error("监听nacos配置失败", e); } } }); while (isRunning) { try { Thread.sleep(10000); }catch (InterruptedException e) { e.printStackTrace(); } } } private void initKnowledge(){ String configMsg = ""; try { configMsg=configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); } catch (NacosException e) { logger.error("从Nacos获取知识库元数据配置文件异常,异常信息为:{}", e.getMessage()); } if (StringUtil.isNotBlank(configMsg)){ ArrayList metaList = JsonPath.parse(configMsg).read(EXPR); if (metaList.size() > 0) { for (Object metadata : metaList) { JSONObject knowledgeJson = new JSONObject(metadata, false, true); String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), knowledgeJson.getStr("format")); String sha256 = knowledgeJson.getStr("sha256"); String filePath = knowledgeJson.getStr("path"); byte[] localFileByte = getLocalFile(fileName); String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); if (sha256.equals(localFileSha256Hex)){ logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256); knowledgeMetaCache.put(fileName, sha256); }else { logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256); updateKnowledge(fileName,filePath,sha256); } } } } } private void updateKnowledge(String fileName, String filePath,String sha256) { InputStream inputStream = null; int retryNum = 0; try { while (retryNum < TRY_TIMES){ inputStream = httpClientUtils.httpGetInputStream(filePath, 90000, header); if (inputStream !=null){ byte[] downloadBytes = IOUtils.toByteArray(inputStream); String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes); if (sha256.equals(downloadFileSha256Hex)&& downloadBytes.length > 0 ){ logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256); boolean updateStatus = updateLocalFile(fileName, downloadBytes); if (updateStatus){ knowledgeMetaCache.put(fileName,sha256); knowledgeUpdateCache.put(fileName, sha256); retryNum = TRY_TIMES; }else { retryNum++; //避免频繁请求HOS Thread.sleep(10000); } }else { logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum); retryNum++; //避免频繁请求HOS Thread.sleep(10000); } } } } catch (IOException ioException) { ioException.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(inputStream); } } private boolean updateLocalFile(String fileName,byte[] downloadBytes) { FileOutputStream outputStream = null; boolean updateStatus = false; try { HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, downloadBytes); updateStatus=true; } catch (IOException ioe) { logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage()); ioe.printStackTrace(); } catch (RuntimeException e) { logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage()); e.printStackTrace(); } finally { IOUtils.closeQuietly(outputStream); } return updateStatus; } private static byte[] getLocalFile(String name) { byte[] fileBytes = null; try { fileBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) ; } catch (RuntimeException | IOException e) { logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage()); e.printStackTrace(); } return fileBytes; } @Override public void cancel() { this.isRunning = false; } }