This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
galaxy-tsg-olap-dos-detecti…/src/main/java/com/zdjizhi/source/SingleHttpSource.java
2023-05-26 15:51:57 +08:00

255 lines
10 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.zdjizhi.source;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.io.file.FileReader;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.json.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.fasterxml.jackson.databind.JavaType;
import com.google.common.base.Joiner;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.CustomFile;
import com.zdjizhi.common.KnowledgeLog;
import com.zdjizhi.utils.*;
import org.apache.commons.io.IOUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executor;
public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
private static final Logger logger = LoggerFactory.getLogger(SingleHttpSource.class);
private Properties nacosProperties;
private String NACOS_DATA_ID;
private String NACOS_GROUP;
private long NACOS_READ_TIMEOUT;
private static String STORE_PATH;
private static HttpClientUtils2 httpClientUtils ;
private ConfigService configService;
private static Header header;
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
private static Map<String, String> knowledgeMetaCache = new HashMap<>();
private static HashMap<String, byte[]> knowledgeFileCache;
private boolean isRunning = true;
//是否下发,默认不发送
private boolean isSending = false;
private static final int TRY_TIMES = 3;
public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
this.nacosProperties = nacosProperties;
this.NACOS_DATA_ID = NACOS_DATA_ID;
this.NACOS_GROUP = NACOS_GROUP;
this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
httpClientUtils = new HttpClientUtils2();
logger.info("连接nacos" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
configService = NacosFactory.createConfigService(nacosProperties);
//初始化元数据缓存
knowledgeMetaCache = new HashMap<>(16);
//初始化定位库缓存
knowledgeFileCache = new HashMap<>(16);
header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
}
@Override
public void run(SourceContext ctx) throws Exception {
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = formatter.format(new Date());
logger.info(format + "receive config from nacos:" + config);
System.out.println(format + "receive config from nacos:" + config);
if (StringUtil.isNotBlank(config)) {
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
loadKnowledge(metaList);
if (isSending){
ctx.collect(knowledgeFileCache);
}
}
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
try {
logger.info("receive update config:" + configMsg);
if (StringUtil.isNotBlank(configMsg)) {
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() >= 1) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
knowledgeMetaCache.put(fileName, sha256);
updateKnowledge(fileName, filePath,sha256);
}
}
if (isSending){
ctx.collect(knowledgeFileCache);
}
}
}
} catch (Exception e) {
logger.error("监听nacos配置失败", e);
}
System.out.println(configMsg);
}
});
while (isRunning) {
Thread.sleep(10000);
}
}
private void loadKnowledge(ArrayList<Object> metaList) {
// InputStream inputStream = null;
try {
if (metaList.size() >= 1) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
byte[] localFileByte = getLocalFile(fileName);
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)){
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
knowledgeMetaCache.put(fileName, sha256);
knowledgeFileCache.put(fileName, localFileByte);
}else {
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
updateKnowledge(fileName,filePath,sha256);
}
}
}
} catch (RuntimeException exception) {
exception.printStackTrace();
}
// finally {
// IOUtils.closeQuietly(inputStream);
// }
}
private void updateKnowledge(String fileName, String filePath,String sha256) {
InputStream inputStream = null;
// FileOutputStream outputStream = null;
int retryNum = 0;
try {
while (retryNum < TRY_TIMES){
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
if (inputStream !=null){
byte[] downloadBytes = IOUtils.toByteArray(inputStream);
if (downloadBytes.length>0){
String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
if (sha256.equals(downloadFileSha256Hex)){
logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
knowledgeMetaCache.put(fileName, sha256);
knowledgeFileCache.put(fileName, downloadBytes);
updateLocalFile(fileName);
retryNum = TRY_TIMES;
isSending = true;
}else {
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
retryNum++;
}
}
}
}
} catch (IOException ioException) {
ioException.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
}
}
private void updateLocalFile(String fileName) {
// InputStream inputStream = null;
FileOutputStream outputStream = null;
try {
FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
outputStream = new FileOutputStream(file);
IoUtil.copy(new ByteArrayInputStream(knowledgeFileCache.get(fileName)), outputStream);
} catch (IOException ioe) {
logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
ioe.printStackTrace();
} catch (RuntimeException e) {
logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage());
e.printStackTrace();
} finally {
IOUtils.closeQuietly(outputStream);
}
}
private static byte[] getLocalFile(String name) {
byte[] fileBytes = null;
try {
fileBytes=new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes();
} catch (RuntimeException e) {
logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage());
e.printStackTrace();
}
return fileBytes;
}
@Override
public void cancel() {
this.isRunning = false;
}
}