package com.zdjizhi.utils; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import com.fasterxml.jackson.databind.JavaType; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.CustomFile; import com.zdjizhi.common.KnowledgeLog; //import com.zdjizhi.function.source.RichHttpSourceFunction; import org.apache.commons.io.IOUtils; import org.apache.flink.configuration.Configuration; import org.apache.http.Header; import org.apache.http.message.BasicHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.Executor; public class HttpSource extends RichHttpSourceFunction> { private static final Logger logger = LoggerFactory.getLogger(HttpSource.class); //连接nacos的配置 private Properties nacosProperties; //nacos data id private String NACOS_DATA_ID; //nacos group private String NACOS_GROUP; //nacos 连接超时时间 private long NACOS_READ_TIMEOUT; //上传到hdfs的路径 private String STORE_PATH; private ConfigService configService; private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); private boolean isRunning = true; public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) { this.nacosProperties = nacosProperties; this.NACOS_DATA_ID = NACOS_DATA_ID; this.NACOS_GROUP = NACOS_GROUP; this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT; this.STORE_PATH = storePath; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); configService = NacosFactory.createConfigService(nacosProperties); } @Override public void run(SourceContext ctx) throws Exception { String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); logger.info("receive config from nacos:" + config); System.out.println("receive config from nacos:" + config); List customFiles; //从nacos拿到配置后连接hos下载知识库到内存 customFiles = loadKnowledge(config); //将知识库传递到下一节点,即广播 ctx.collectWithTimestamp(customFiles,System.currentTimeMillis()); configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { @Override public Executor getExecutor() { return null; } @Override public void receiveConfigInfo(String configMsg) { try { logger.info("receive update config:" + configMsg); List customFiles = new ArrayList<>(); customFiles = loadKnowledge(configMsg); ctx.collectWithTimestamp(customFiles,System.currentTimeMillis()); //将更新后的文件重新上传至hdfs for (CustomFile customFile : customFiles) { logger.info("begin upload to hdfs:" + STORE_PATH + customFile.getFileName()); HdfsUtils.uploadFileByBytes(STORE_PATH + customFile.getFileName(),customFile.getContent()); logger.info(STORE_PATH + customFile.getFileName() + " upload finished"); } } catch (Exception e) { logger.error("监听nacos配置失败", e); } System.out.println(configMsg); } }); } //下载知识库到内存 public List loadKnowledge(String config) { List customFiles = new ArrayList<>(); List knowledges = jsonMapperInstance.fromJson(config, listType); for (KnowledgeLog knowledge : knowledges) { CustomFile customFile = new CustomFile(); String fileName = knowledge.getName().concat(".").concat(knowledge.getFormat()); customFile.setFileName(fileName); InputStream inputStream = null; try { Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); HttpClientUtils httpClientUtils = new HttpClientUtils(); inputStream = httpClientUtils.httpGetInputStream(knowledge.getPath(), 3000, header); byte[] bytes = IOUtils.toByteArray(inputStream); customFile.setContent(bytes); } catch (IOException ioException) { ioException.printStackTrace(); } finally { IOUtils.closeQuietly(inputStream); } customFiles.add(customFile); } return customFiles; } @Override public void cancel() { this.isRunning = false; } }