package com.zdjizhi.source; import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.IoUtil; import cn.hutool.json.JSONObject; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import com.fasterxml.jackson.databind.JavaType; import com.google.common.base.Joiner; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.CustomFile; import com.zdjizhi.common.KnowledgeLog; import com.zdjizhi.utils.*; import org.apache.commons.io.IOUtils; import org.apache.flink.configuration.Configuration; import org.apache.http.Header; import org.apache.http.message.BasicHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.Executor; public class HttpSource extends RichHttpSourceFunction> { private static final Logger logger = LoggerFactory.getLogger(HttpSource.class); private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']"; //连接nacos的配置 private Properties nacosProperties; //nacos data id private String NACOS_DATA_ID; //nacos group private String NACOS_GROUP; //nacos 连接超时时间 private long NACOS_READ_TIMEOUT; //上传到hdfs的路径 private String STORE_PATH; private ConfigService configService; // private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); // private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); private static Map updateMap = new HashMap<>(); private static HashMap knowledgeFileCache; private boolean isRunning = true; public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) { this.nacosProperties = nacosProperties; this.NACOS_DATA_ID = NACOS_DATA_ID; this.NACOS_GROUP = NACOS_GROUP; this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT; this.STORE_PATH = storePath; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //初始化元数据缓存 updateMap = new HashMap<>(16); //初始化定位库缓存 knowledgeFileCache = new HashMap<>(16); logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); configService = NacosFactory.createConfigService(nacosProperties); } @Override public void run(SourceContext ctx) throws Exception { // ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String format = formatter.format(new Date()); logger.info(format + "receive config from nacos:" + config); System.out.println(format + "receive config from nacos:" + config); if (StringUtil.isNotBlank(config)) { ArrayList metaList = JsonPath.parse(config).read(EXPR); loadKnowledge(metaList); } configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { @Override public Executor getExecutor() { return null; } @Override public void receiveConfigInfo(String configMsg) { try { logger.info("receive update config:" + configMsg); if (StringUtil.isNotBlank(configMsg)) { ArrayList metaList = JsonPath.parse(configMsg).read(EXPR); if (metaList.size() >= 1) { for (Object metadata : metaList) { JSONObject knowledgeJson = new JSONObject(metadata, false, true); String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), knowledgeJson.getStr("format")); String sha256 = knowledgeJson.getStr("sha256"); String filePath = knowledgeJson.getStr("path"); if (!sha256.equals(updateMap.get(fileName))) { updateMap.put(fileName, sha256); updateKnowledge(fileName, filePath); } } ctx.collect(knowledgeFileCache); } } } catch (Exception e) { logger.error("监听nacos配置失败", e); } System.out.println(configMsg); } }); while (isRunning) { Thread.sleep(10000); } } private void loadKnowledge(ArrayList metaList) { InputStream inputStream = null; try { if (metaList.size() >= 1) { for (Object metadata : metaList) { JSONObject knowledgeJson = new JSONObject(metadata, false, true); String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), knowledgeJson.getStr("format")); String sha256 = knowledgeJson.getStr("sha256"); String filePath = knowledgeJson.getStr("path"); Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); updateMap.put(fileName, sha256); knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream)); } } } catch (IOException ioException) { ioException.printStackTrace(); } finally { IOUtils.closeQuietly(inputStream); } } private void updateKnowledge(String fileName, String filePath) { InputStream inputStream = null; FileOutputStream outputStream = null; try { Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); byte[] bytes = IOUtils.toByteArray(inputStream); HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, bytes); knowledgeFileCache.put(fileName, bytes); } catch (IOException ioException) { ioException.printStackTrace(); } finally { IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(outputStream); } } @Override public void cancel() { this.isRunning = false; } }