GAL-349 优化DoS检测程序知识库更新流程

This commit is contained in:
unknown
2023-06-08 16:57:19 +08:00
parent 6fb37324ff
commit b9a694ddb9
6 changed files with 256 additions and 144 deletions

View File

@@ -9,6 +9,7 @@ import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.fasterxml.jackson.databind.JavaType;
import com.google.common.base.Joiner;
import com.jayway.jsonpath.JsonPath;
@@ -29,17 +30,16 @@ import java.util.*;
import java.util.concurrent.Executor;
public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
public class HttpSource extends RichHttpSourceFunction<Map<String, String>> {
private static final Logger logger = LoggerFactory.getLogger(HttpSource.class);
private static final int TRY_TIMES = 3;
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
// private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_user_defined'])].['name','sha256','format','path']";
private static Map<String, String> knowledgeMetaCache = new HashMap<>();
private static HashMap<String, byte[]> knowledgeFileCache;
private static HashMap<String, String> knowledgeUpdateCache;
private static final int TRY_TIMES = 3;
private static HttpClientUtils2 httpClientUtils;
@@ -55,9 +55,6 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
//nacos 连接超时时间
private long NACOS_READ_TIMEOUT;
//上传到hdfs的路径
private String STORE_PATH;
private ConfigService configService;
private static Header header;
@@ -68,17 +65,11 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
private boolean isSending = false;
// private boolean isRunning = true;
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) {
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
this.nacosProperties = nacosProperties;
this.NACOS_DATA_ID = NACOS_DATA_ID;
this.NACOS_GROUP = NACOS_GROUP;
this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT;
this.STORE_PATH = storePath;
}
@Override
@@ -88,22 +79,29 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
//初始化元数据缓存
knowledgeMetaCache = new HashMap<>(16);
//初始化定位库缓存
knowledgeFileCache = new HashMap<>(16);
knowledgeUpdateCache = new HashMap<>(16);
header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
//连接nacos配置
try {
configService = NacosFactory.createConfigService(nacosProperties);
}catch (NacosException e){
logger.error("Get Schema config from Nacos error,The exception message is :{}", e.getMessage());
}
//初始化知识库
initKnowledge();
logger.info("连接nacos" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
configService = NacosFactory.createConfigService(nacosProperties);
}
@Override
public void run(SourceContext ctx) throws Exception {
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
if (StringUtil.isNotBlank(config)) {
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
loadKnowledge(metaList);
if (isSending){
ctx.collect(knowledgeFileCache);
}
if (!knowledgeUpdateCache.isEmpty()){
ctx.collect(knowledgeUpdateCache);
knowledgeUpdateCache.clear();
}
// }
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@@ -118,7 +116,7 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
logger.info("receive update config:" + configMsg);
if (StringUtil.isNotBlank(configMsg)) {
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() >= 1) {
if (metaList.size() > 0) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
@@ -130,8 +128,9 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
updateKnowledge(fileName, filePath,sha256);
}
}
if (isSending){
ctx.collect(knowledgeFileCache);
if (!knowledgeUpdateCache.isEmpty()){
ctx.collect(knowledgeUpdateCache);
knowledgeUpdateCache.clear();
}
}
}
@@ -139,7 +138,6 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
} catch (Exception e) {
logger.error("监听nacos配置失败", e);
}
System.out.println(configMsg);
}
});
@@ -153,10 +151,17 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
}
private void loadKnowledge(ArrayList<Object> metaList) {
InputStream inputStream = null;
private void initKnowledge(){
String configMsg = "";
try {
if (metaList.size() >= 1) {
configMsg=configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
} catch (NacosException e) {
logger.error("从Nacos获取知识库元数据配置文件异常异常信息为:{}", e.getMessage());
}
if (StringUtil.isNotBlank(configMsg)){
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() > 0) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
@@ -168,8 +173,6 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
if (sha256.equals(localFileSha256Hex)){
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
knowledgeMetaCache.put(fileName, sha256);
// knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
knowledgeFileCache.put(fileName, localFileByte);
}else {
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
updateKnowledge(fileName,filePath,sha256);
@@ -177,55 +180,54 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
}
}
// } catch (IOException ioException) {
} catch (Exception ioException) {
ioException.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
}
}
private void updateKnowledge(String fileName, String filePath,String sha256) {
InputStream inputStream = null;
// FileOutputStream outputStream = null;
int retryNum = 0;
try {
while (retryNum < TRY_TIMES){
inputStream = httpClientUtils.httpGetInputStream(filePath, 90000, header);
if (inputStream !=null){
byte[] downloadBytes = IOUtils.toByteArray(inputStream);
if (downloadBytes.length>0){
String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
if (sha256.equals(downloadFileSha256Hex)){
logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
// HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, downloadBytes);
String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
if (sha256.equals(downloadFileSha256Hex)&& downloadBytes.length > 0 ){
logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
boolean updateStatus = updateLocalFile(fileName, downloadBytes);
if (updateStatus){
knowledgeMetaCache.put(fileName,sha256);
knowledgeFileCache.put(fileName, downloadBytes);
updateLocalFile(fileName);
knowledgeUpdateCache.put(fileName, sha256);
retryNum = TRY_TIMES;
isSending = true;
}else {
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
}else {
retryNum++;
//避免频繁请求HOS
Thread.sleep(10000);
}
}else {
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
retryNum++;
//避免频繁请求HOS
Thread.sleep(10000);
}
}
}
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
// IOUtils.closeQuietly(outputStream);
}
}
private void updateLocalFile(String fileName) {
private boolean updateLocalFile(String fileName,byte[] downloadBytes) {
FileOutputStream outputStream = null;
boolean updateStatus = false;
try {
HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, knowledgeFileCache.get(fileName));
HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, downloadBytes);
updateStatus=true;
} catch (IOException ioe) {
logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
ioe.printStackTrace();
@@ -235,7 +237,7 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
} finally {
IOUtils.closeQuietly(outputStream);
}
return updateStatus;
}
private static byte[] getLocalFile(String name) {