Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
24d70f690e | ||
|
|
77e982b22f | ||
|
|
b3a23686a0 | ||
|
|
b9a694ddb9 | ||
|
|
6fb37324ff | ||
|
|
315b638470 | ||
|
|
bd48417eb8 | ||
|
|
72acc976e3 | ||
|
|
6be3ea7f1e | ||
|
|
04ee45f77d |
12
pom.xml
12
pom.xml
@@ -20,7 +20,7 @@
|
||||
<repository>
|
||||
<id>nexus</id>
|
||||
<name>Team Nexus Repository</name>
|
||||
<url>http://192.168.40.125:8099/content/groups/public</url>
|
||||
<url>http://192.168.40.153:8099/content/groups/public</url>
|
||||
</repository>
|
||||
|
||||
<repository>
|
||||
@@ -196,7 +196,6 @@
|
||||
<!-- <groupId>org.apache.hadoop</groupId>-->
|
||||
<!-- <artifactId>hadoop-hdfs</artifactId>-->
|
||||
<!-- <version>${hadoop.version}</version>-->
|
||||
<!-- <scope>provided</scope>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
|
||||
@@ -262,7 +261,7 @@
|
||||
<dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
<version>1.1.1</version>
|
||||
<version>1.1.3</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
@@ -279,6 +278,13 @@
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.fastjson2</groupId>
|
||||
<artifactId>fastjson2</artifactId>
|
||||
<version>2.0.32</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.nacos</groupId>
|
||||
<artifactId>nacos-client</artifactId>
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.zdjizhi.etl;
|
||||
|
||||
import cn.hutool.core.math.MathUtil;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.common.*;
|
||||
@@ -23,7 +24,7 @@ import java.util.concurrent.TimeUnit;
|
||||
/**
|
||||
* @author wlh
|
||||
*/
|
||||
public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<String, byte[]>, DosEventLog> {
|
||||
public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<String, String>, DosEventLog> {
|
||||
|
||||
private static final Log logger = LogFactory.get();
|
||||
private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
|
||||
@@ -99,8 +100,10 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processBroadcastElement(Map<String, byte[]> value, Context ctx, Collector<DosEventLog> out) throws Exception {
|
||||
IpUtils.updateIpLook(value);
|
||||
public void processBroadcastElement(Map<String, String> value, Context ctx, Collector<DosEventLog> out) throws Exception {
|
||||
if (!value.isEmpty()){
|
||||
IpUtils.updateIpLook(value);
|
||||
}
|
||||
}
|
||||
|
||||
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
|
||||
@@ -120,22 +123,32 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
|
||||
}
|
||||
|
||||
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) throws CloneNotSupportedException {
|
||||
long base = threshold.getSessionsPerSec();
|
||||
long diff = value.getSketch_sessions() - base;
|
||||
long profileId = threshold.getProfileId();
|
||||
DosEventLog result = getDosEventLog(value, base, diff, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
|
||||
if (result == null) {
|
||||
base = threshold.getPacketsPerSec();
|
||||
diff = value.getSketch_packets() - base;
|
||||
long sessionBase = threshold.getSessionsPerSec();
|
||||
long pktBase=threshold.getPacketsPerSec();
|
||||
long bitBase=threshold.getBitsPerSec();
|
||||
|
||||
long diffSession = value.getSketch_sessions() - sessionBase;
|
||||
long diffPkt = value.getSketch_packets() - pktBase;
|
||||
long diffByte = value.getSketch_bytes() - bitBase;
|
||||
|
||||
Double diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100;
|
||||
Double diffPktPercent = getDiffPercent(diffPkt, pktBase)*100;
|
||||
Double diffBitPercent = getDiffPercent(diffByte, bitBase)*100;
|
||||
|
||||
long profileId = 0;
|
||||
DosEventLog result =null;
|
||||
|
||||
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){
|
||||
profileId = threshold.getProfileId();
|
||||
result = getDosEventLog(value, base, diff,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
|
||||
if (result == null) {
|
||||
base = threshold.getBitsPerSec();
|
||||
diff = value.getSketch_bytes() - base;
|
||||
profileId=threshold.getProfileId();
|
||||
result = getDosEventLog(value, base, diff, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
|
||||
}
|
||||
result= getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
|
||||
}else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){
|
||||
profileId = threshold.getProfileId();
|
||||
result = getDosEventLog(value, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
|
||||
}else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){
|
||||
profileId = threshold.getProfileId();
|
||||
result = getDosEventLog(value, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
|
||||
}
|
||||
|
||||
/*
|
||||
ArrayList<DosEventLog> dosEventLogs = new ArrayList<>();
|
||||
if (result != null){
|
||||
@@ -168,7 +181,8 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
|
||||
}else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold){
|
||||
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}",destinationIp, attackType, base, percent, value);
|
||||
}else {
|
||||
result = getResult(value, base, profileId, severity, percent+1, type, tag);
|
||||
// result = getResult(value, base, profileId, severity, percent+1, type, tag);
|
||||
result = getResult(value, base, profileId, severity, percent, type, tag);
|
||||
if (type == SENSITIVITY_CONDITION_TYPE){
|
||||
result.setSeverity(Severity.MAJOR.severity);
|
||||
}
|
||||
@@ -190,7 +204,8 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
|
||||
dosEventLog.setProfile_id(profileId);
|
||||
dosEventLog.setAttack_type(value.getAttack_type());
|
||||
dosEventLog.setSeverity(severity.severity);
|
||||
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
|
||||
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag,dosEventLog));
|
||||
// dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog));
|
||||
dosEventLog.setDestination_ip(value.getDestination_ip());
|
||||
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
|
||||
String ipList = value.getSource_ip();
|
||||
@@ -227,12 +242,24 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
|
||||
return base;
|
||||
}
|
||||
|
||||
private String getConditions(String percent, long base, long sessions, int type, String tag) {
|
||||
private String getConditions(String percent, long base, long sessions, int type, String tag,DosEventLog dosEventLog) {
|
||||
int condition =0;
|
||||
if ("Minor".equals(dosEventLog.getSeverity())){
|
||||
condition=50;
|
||||
}else if ("Warning".equals(dosEventLog.getSeverity())){
|
||||
condition=100;
|
||||
}else if ("Major".equals(dosEventLog.getSeverity())){
|
||||
condition=250;
|
||||
}else if ("Severe".equals(dosEventLog.getSeverity())){
|
||||
condition=500;
|
||||
}else if ("Critical".equals(dosEventLog.getSeverity())){
|
||||
condition =800;
|
||||
}
|
||||
switch (type) {
|
||||
case STATIC_CONDITION_TYPE:
|
||||
return "Rate > " +
|
||||
base + " " +
|
||||
tag + "/s";
|
||||
tag + "/s" + "(>"+condition+"%)";
|
||||
case BASELINE_CONDITION_TYPE:
|
||||
return tag + " > " +
|
||||
percent + " of baseline";
|
||||
|
||||
@@ -38,6 +38,8 @@ public class ParseBaselineThreshold {
|
||||
config.set("hbase.client.retries.number", "3");
|
||||
config.set("hbase.bulkload.retries.number", "3");
|
||||
config.set("zookeeper.recovery.retry", "3");
|
||||
config.set("hbase.defaults.for.version", "2.2.3");
|
||||
config.set("hbase.defaults.for.version.skip", "true");
|
||||
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
||||
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package com.zdjizhi.etl;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import com.zdjizhi.source.DosSketchSource;
|
||||
import com.zdjizhi.utils.FlinkEnvironmentUtils;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
//import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
@@ -23,9 +24,9 @@ import java.util.*;
|
||||
public class ParseSketchLog {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
|
||||
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
|
||||
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
|
||||
private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
|
||||
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
|
||||
// private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
|
||||
// private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
|
||||
|
||||
|
||||
public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
|
||||
@@ -47,12 +48,15 @@ public class ParseSketchLog {
|
||||
public void flatMap(String s, Collector<DosSketchLog> collector) {
|
||||
try {
|
||||
if (StringUtil.isNotBlank(s)){
|
||||
HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
|
||||
HashMap<String, Object> sketchSource = JSONObject.parseObject(s, HashMap.class);
|
||||
// HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
|
||||
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
|
||||
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
|
||||
String attackType = sketchSource.get("attack_type").toString();
|
||||
int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString());
|
||||
ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
|
||||
String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list"));
|
||||
ArrayList<HashMap<String, Object>> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class);
|
||||
// ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
|
||||
for (HashMap<String, Object> obj : reportIpList) {
|
||||
DosSketchLog dosSketchLog = new DosSketchLog();
|
||||
dosSketchLog.setSketch_start_time(sketchStartTime);
|
||||
|
||||
@@ -2,12 +2,14 @@ package com.zdjizhi.etl;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
//import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosDetectionThreshold;
|
||||
import com.zdjizhi.common.DosVsysId;
|
||||
import com.zdjizhi.utils.HttpClientUtils;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
//import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.NacosUtils;
|
||||
import inet.ipaddr.IPAddress;
|
||||
import inet.ipaddr.IPAddressString;
|
||||
@@ -19,6 +21,7 @@ import org.apache.http.message.BasicHeader;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@@ -29,10 +32,10 @@ public class ParseStaticThreshold {
|
||||
private static final Log logger = LogFactory.get();
|
||||
private static String encryptpwd;
|
||||
|
||||
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
|
||||
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
|
||||
private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
|
||||
private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class);
|
||||
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
|
||||
// private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
|
||||
// private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
|
||||
// private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class);
|
||||
|
||||
static {
|
||||
//加载加密登录密码
|
||||
@@ -51,11 +54,14 @@ public class ParseStaticThreshold {
|
||||
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms);
|
||||
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build());
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
||||
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
|
||||
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
boolean success = (boolean) resposeMap.get("success");
|
||||
String msg = resposeMap.get("msg").toString();
|
||||
|
||||
if (success) {
|
||||
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
|
||||
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
psw = data.get("encryptpwd").toString();
|
||||
} else {
|
||||
logger.error(msg);
|
||||
@@ -85,11 +91,13 @@ public class ParseStaticThreshold {
|
||||
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms);
|
||||
String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null);
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
||||
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
|
||||
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
boolean success = (boolean) resposeMap.get("success");
|
||||
String msg = resposeMap.get("msg").toString();
|
||||
if (success) {
|
||||
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
|
||||
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
token = data.get("token").toString();
|
||||
} else {
|
||||
logger.error(msg);
|
||||
@@ -122,14 +130,21 @@ public class ParseStaticThreshold {
|
||||
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
||||
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
|
||||
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
boolean success = (boolean) resposeMap.get("success");
|
||||
String msg = resposeMap.get("msg").toString();
|
||||
if (success) {
|
||||
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
|
||||
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
Object list = data.get("list");
|
||||
if (list != null) {
|
||||
vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
|
||||
String s = JSONObject.toJSONString(list);
|
||||
List<DosVsysId> dosVsysIds = JSON.parseArray(JSONObject.toJSONString(list), DosVsysId.class);
|
||||
// vsysIdList= JSONObject.parseObject(JSONObject.toJSONString(list), DosVsysId.class);
|
||||
vsysIdList= (ArrayList)dosVsysIds;
|
||||
|
||||
// vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
|
||||
logger.info("获取到vsysId {}条", vsysIdList.size());
|
||||
} else {
|
||||
logger.warn("vsysIdList为空");
|
||||
@@ -171,14 +186,19 @@ public class ParseStaticThreshold {
|
||||
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
||||
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr,HashMap.class);
|
||||
boolean success = (boolean) resposeMap.get("success");
|
||||
String msg = resposeMap.get("msg").toString();
|
||||
if (success) {
|
||||
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
|
||||
Object list = data.get("list");
|
||||
if (list != null) {
|
||||
ArrayList<DosDetectionThreshold> thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
|
||||
// ArrayList<DosDetectionThreshold> thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
|
||||
// ArrayList<DosDetectionThreshold> thresholds = JSONObject.parseObject(JSONObject.toJSONString(list), ArrayList.class);
|
||||
List<DosDetectionThreshold> dosDetectionThresholds = JSON.parseArray(JSONObject.toJSONString(list), DosDetectionThreshold.class);
|
||||
ArrayList<DosDetectionThreshold> thresholds = (ArrayList)dosDetectionThresholds;
|
||||
for (DosDetectionThreshold dosDetectionThreshold : thresholds) {
|
||||
dosDetectionThreshold.setSuperiorIds(superiorIds);
|
||||
vsysThresholds.add(dosDetectionThreshold);
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
package com.zdjizhi.sink;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosEventLog;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
//import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.KafkaUtils;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
|
||||
@@ -13,7 +14,8 @@ class DosEventSink {
|
||||
static void dosEventOutputSink(SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream){
|
||||
dosEventLogOutputStream
|
||||
.filter(Objects::nonNull)
|
||||
.map(JsonMapper::toJsonString)
|
||||
// .map(JsonMapper::toJsonString)
|
||||
.map(JSONObject::toJSONString)
|
||||
.addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
|
||||
.setParallelism(CommonConfig.KAFKA_OUTPUT_EVENT_PARALLELISM);
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ import java.util.Properties;
|
||||
* @author 94976
|
||||
*/
|
||||
public class OutputStreamSink {
|
||||
// private static final Logger logger = LoggerFactory.getLogger(OutputStreamSink.class);
|
||||
// private static final Logger logger = LoggerFactory.getLogger(OutputStreamSink.class);
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
public static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics"){};
|
||||
@@ -46,7 +46,7 @@ public class OutputStreamSink {
|
||||
}
|
||||
|
||||
private static SingleOutputStreamOperator<DosEventLog> getEventSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
|
||||
DataStreamSource<Map<String, byte[]>> broadcastSource=null;
|
||||
DataStreamSource<Map<String, String>> broadcastSource=null;
|
||||
Properties nacosProperties = new Properties();
|
||||
|
||||
nacosProperties.put(PropertyKeyConst.SERVER_ADDR,CommonConfig.NACOS_SERVER_ADDR);
|
||||
@@ -55,7 +55,7 @@ public class OutputStreamSink {
|
||||
nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, CommonConfig.NACOS_NAMESPACE);
|
||||
|
||||
if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)){
|
||||
broadcastSource = DosSketchSource.broadcastSource(nacosProperties,CommonConfig.HDFS_PATH);
|
||||
broadcastSource = DosSketchSource.broadcastSource(nacosProperties);
|
||||
}else {
|
||||
broadcastSource= DosSketchSource.singleBroadcastSource(nacosProperties);
|
||||
}
|
||||
@@ -63,7 +63,7 @@ public class OutputStreamSink {
|
||||
MapStateDescriptor<String,Map> descriptor =
|
||||
new MapStateDescriptor<>("descriptorTest", Types.STRING, TypeInformation.of(Map.class));
|
||||
|
||||
BroadcastStream<Map<String, byte[]>> broadcast = broadcastSource.broadcast(descriptor);
|
||||
BroadcastStream<Map<String, String>> broadcast = broadcastSource.broadcast(descriptor);
|
||||
|
||||
return middleStream
|
||||
.connect(broadcast)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.zdjizhi.sink;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosMetricsLog;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
@@ -14,7 +15,8 @@ class TrafficServerIpMetricsSink {
|
||||
|
||||
static void sideOutputMetricsSink(SingleOutputStreamOperator<DosSketchLog> outputStream){
|
||||
DataStream<DosMetricsLog> sideOutput = outputStream.getSideOutput(outputTag);
|
||||
sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
|
||||
// sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
|
||||
sideOutput.map(JSONObject::toJSONString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
|
||||
.setParallelism(CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM);
|
||||
|
||||
|
||||
|
||||
@@ -36,11 +36,11 @@ public class DosSketchSource {
|
||||
}
|
||||
|
||||
|
||||
public static DataStreamSource<Map<String, byte[]>> broadcastSource(Properties nacosProperties, String STORE_PATH){
|
||||
return streamExeEnv.addSource(new HttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT,STORE_PATH));
|
||||
public static DataStreamSource<Map<String, String>> broadcastSource(Properties nacosProperties){
|
||||
return streamExeEnv.addSource(new HttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT));
|
||||
}
|
||||
|
||||
public static DataStreamSource<Map<String, byte[]>> singleBroadcastSource(Properties nacosProperties){
|
||||
public static DataStreamSource<Map<String, String>>singleBroadcastSource(Properties nacosProperties){
|
||||
return streamExeEnv.addSource(new SingleHttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,11 +2,14 @@ package com.zdjizhi.source;
|
||||
|
||||
import cn.hutool.core.io.FileUtil;
|
||||
import cn.hutool.core.io.IoUtil;
|
||||
import cn.hutool.core.io.file.FileReader;
|
||||
import cn.hutool.crypto.digest.DigestUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import com.alibaba.nacos.api.NacosFactory;
|
||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||
import com.alibaba.nacos.api.config.ConfigService;
|
||||
import com.alibaba.nacos.api.config.listener.Listener;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
@@ -27,12 +30,19 @@ import java.util.*;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
|
||||
public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
||||
|
||||
public class HttpSource extends RichHttpSourceFunction<Map<String, String>> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(HttpSource.class);
|
||||
|
||||
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
|
||||
|
||||
private static Map<String, String> knowledgeMetaCache = new HashMap<>();
|
||||
|
||||
private static HashMap<String, String> knowledgeUpdateCache;
|
||||
|
||||
private static final int TRY_TIMES = 3;
|
||||
|
||||
private static HttpClientUtils2 httpClientUtils;
|
||||
|
||||
//连接nacos的配置
|
||||
private Properties nacosProperties;
|
||||
|
||||
@@ -45,48 +55,53 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
||||
//nacos 连接超时时间
|
||||
private long NACOS_READ_TIMEOUT;
|
||||
|
||||
//上传到hdfs的路径
|
||||
private String STORE_PATH;
|
||||
|
||||
private ConfigService configService;
|
||||
|
||||
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
|
||||
// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
|
||||
private static Map<String, String> updateMap = new HashMap<>();
|
||||
private static HashMap<String, byte[]> knowledgeFileCache;
|
||||
private static Header header;
|
||||
|
||||
//运行状态cancel时置为false
|
||||
private boolean isRunning = true;
|
||||
//是否下发,默认不发送
|
||||
private boolean isSending = false;
|
||||
|
||||
|
||||
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) {
|
||||
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
|
||||
this.nacosProperties = nacosProperties;
|
||||
this.NACOS_DATA_ID = NACOS_DATA_ID;
|
||||
this.NACOS_GROUP = NACOS_GROUP;
|
||||
this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT;
|
||||
this.STORE_PATH = storePath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
httpClientUtils = new HttpClientUtils2();
|
||||
//初始化元数据缓存
|
||||
updateMap = new HashMap<>(16);
|
||||
knowledgeMetaCache = new HashMap<>(16);
|
||||
//初始化定位库缓存
|
||||
knowledgeFileCache = new HashMap<>(16);
|
||||
knowledgeUpdateCache = new HashMap<>(16);
|
||||
|
||||
header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
||||
//连接nacos配置
|
||||
try {
|
||||
configService = NacosFactory.createConfigService(nacosProperties);
|
||||
}catch (NacosException e){
|
||||
logger.error("Get Schema config from Nacos error,The exception message is :{}", e.getMessage());
|
||||
}
|
||||
|
||||
//初始化知识库
|
||||
initKnowledge();
|
||||
logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
|
||||
configService = NacosFactory.createConfigService(nacosProperties);
|
||||
|
||||
|
||||
}
|
||||
@Override
|
||||
public void run(SourceContext ctx) throws Exception {
|
||||
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
|
||||
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
|
||||
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
String format = formatter.format(new Date());
|
||||
logger.info(format + "receive config from nacos:" + config);
|
||||
System.out.println(format + "receive config from nacos:" + config);
|
||||
if (StringUtil.isNotBlank(config)) {
|
||||
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
|
||||
loadKnowledge(metaList);
|
||||
if (!knowledgeUpdateCache.isEmpty()){
|
||||
ctx.collect(knowledgeUpdateCache);
|
||||
knowledgeUpdateCache.clear();
|
||||
}
|
||||
// }
|
||||
|
||||
|
||||
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
|
||||
@@ -101,78 +116,141 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
||||
logger.info("receive update config:" + configMsg);
|
||||
if (StringUtil.isNotBlank(configMsg)) {
|
||||
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
|
||||
if (metaList.size() >= 1) {
|
||||
if (metaList.size() > 0) {
|
||||
for (Object metadata : metaList) {
|
||||
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
|
||||
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
|
||||
knowledgeJson.getStr("format"));
|
||||
String sha256 = knowledgeJson.getStr("sha256");
|
||||
String filePath = knowledgeJson.getStr("path");
|
||||
if (!sha256.equals(updateMap.get(fileName))) {
|
||||
updateMap.put(fileName, sha256);
|
||||
updateKnowledge(fileName, filePath);
|
||||
if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
|
||||
knowledgeMetaCache.put(fileName, sha256);
|
||||
updateKnowledge(fileName, filePath,sha256);
|
||||
}
|
||||
|
||||
}
|
||||
ctx.collect(knowledgeFileCache);
|
||||
if (!knowledgeUpdateCache.isEmpty()){
|
||||
ctx.collect(knowledgeUpdateCache);
|
||||
knowledgeUpdateCache.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("监听nacos配置失败", e);
|
||||
}
|
||||
System.out.println(configMsg);
|
||||
}
|
||||
});
|
||||
|
||||
while (isRunning) {
|
||||
Thread.sleep(10000);
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
}catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void loadKnowledge(ArrayList<Object> metaList) {
|
||||
InputStream inputStream = null;
|
||||
private void initKnowledge(){
|
||||
String configMsg = "";
|
||||
try {
|
||||
if (metaList.size() >= 1) {
|
||||
configMsg=configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
|
||||
} catch (NacosException e) {
|
||||
logger.error("从Nacos获取知识库元数据配置文件异常,异常信息为:{}", e.getMessage());
|
||||
}
|
||||
|
||||
if (StringUtil.isNotBlank(configMsg)){
|
||||
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
|
||||
if (metaList.size() > 0) {
|
||||
for (Object metadata : metaList) {
|
||||
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
|
||||
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
|
||||
knowledgeJson.getStr("format"));
|
||||
String sha256 = knowledgeJson.getStr("sha256");
|
||||
String filePath = knowledgeJson.getStr("path");
|
||||
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
||||
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
|
||||
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
||||
updateMap.put(fileName, sha256);
|
||||
knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
|
||||
byte[] localFileByte = getLocalFile(fileName);
|
||||
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||
if (sha256.equals(localFileSha256Hex)){
|
||||
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
|
||||
knowledgeMetaCache.put(fileName, sha256);
|
||||
}else {
|
||||
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
|
||||
updateKnowledge(fileName,filePath,sha256);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateKnowledge(String fileName, String filePath,String sha256) {
|
||||
InputStream inputStream = null;
|
||||
int retryNum = 0;
|
||||
try {
|
||||
while (retryNum < TRY_TIMES){
|
||||
inputStream = httpClientUtils.httpGetInputStream(filePath, 90000, header);
|
||||
if (inputStream !=null){
|
||||
byte[] downloadBytes = IOUtils.toByteArray(inputStream);
|
||||
String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
|
||||
if (sha256.equals(downloadFileSha256Hex)&& downloadBytes.length > 0 ){
|
||||
logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
|
||||
boolean updateStatus = updateLocalFile(fileName, downloadBytes);
|
||||
if (updateStatus){
|
||||
knowledgeMetaCache.put(fileName,sha256);
|
||||
knowledgeUpdateCache.put(fileName, sha256);
|
||||
retryNum = TRY_TIMES;
|
||||
}else {
|
||||
retryNum++;
|
||||
//避免频繁请求HOS
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
}else {
|
||||
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
|
||||
retryNum++;
|
||||
//避免频繁请求HOS
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException ioException) {
|
||||
ioException.printStackTrace();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
IOUtils.closeQuietly(inputStream);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void updateKnowledge(String fileName, String filePath) {
|
||||
InputStream inputStream = null;
|
||||
private boolean updateLocalFile(String fileName,byte[] downloadBytes) {
|
||||
FileOutputStream outputStream = null;
|
||||
boolean updateStatus = false;
|
||||
try {
|
||||
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
||||
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
|
||||
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
||||
byte[] bytes = IOUtils.toByteArray(inputStream);
|
||||
HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, bytes);
|
||||
knowledgeFileCache.put(fileName, bytes);
|
||||
} catch (IOException ioException) {
|
||||
ioException.printStackTrace();
|
||||
HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, downloadBytes);
|
||||
updateStatus=true;
|
||||
} catch (IOException ioe) {
|
||||
logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
|
||||
ioe.printStackTrace();
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage());
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
IOUtils.closeQuietly(inputStream);
|
||||
IOUtils.closeQuietly(outputStream);
|
||||
}
|
||||
return updateStatus;
|
||||
}
|
||||
|
||||
private static byte[] getLocalFile(String name) {
|
||||
byte[] fileBytes = null;
|
||||
try {
|
||||
fileBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) ;
|
||||
} catch (RuntimeException | IOException e) {
|
||||
logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
return fileBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
this.isRunning = false;
|
||||
|
||||
@@ -2,11 +2,14 @@ package com.zdjizhi.source;
|
||||
|
||||
import cn.hutool.core.io.FileUtil;
|
||||
import cn.hutool.core.io.IoUtil;
|
||||
import cn.hutool.core.io.file.FileReader;
|
||||
import cn.hutool.crypto.digest.DigestUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import com.alibaba.nacos.api.NacosFactory;
|
||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||
import com.alibaba.nacos.api.config.ConfigService;
|
||||
import com.alibaba.nacos.api.config.listener.Listener;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
@@ -23,13 +26,22 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.*;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
|
||||
public class SingleHttpSource extends RichHttpSourceFunction<Map<String, String>> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SingleHttpSource.class);
|
||||
private static HashMap<String, byte[]> knowledgeFileCache;
|
||||
|
||||
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
|
||||
|
||||
|
||||
private static Map<String, String> knowledgeMetaCache = new HashMap<>();
|
||||
|
||||
private static HashMap<String, String> knowledgeUpdateCache;
|
||||
|
||||
private static final int TRY_TIMES = 3;
|
||||
|
||||
private Properties nacosProperties;
|
||||
|
||||
@@ -39,20 +51,16 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
|
||||
|
||||
private long NACOS_READ_TIMEOUT;
|
||||
|
||||
private static String STORE_PATH;
|
||||
private static HttpClientUtils2 httpClientUtils ;
|
||||
|
||||
private ConfigService configService;
|
||||
|
||||
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
|
||||
// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
|
||||
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
|
||||
|
||||
|
||||
private static Map<String, String> updateMap = new HashMap<>();
|
||||
private static Header header;
|
||||
|
||||
private boolean isRunning = true;
|
||||
|
||||
|
||||
|
||||
public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
|
||||
this.nacosProperties = nacosProperties;
|
||||
this.NACOS_DATA_ID = NACOS_DATA_ID;
|
||||
@@ -65,33 +73,32 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
|
||||
configService = NacosFactory.createConfigService(nacosProperties);
|
||||
httpClientUtils = new HttpClientUtils2();
|
||||
//初始化元数据缓存
|
||||
updateMap = new HashMap<>(16);
|
||||
knowledgeMetaCache = new HashMap<>(16);
|
||||
//初始化定位库缓存
|
||||
knowledgeFileCache = new HashMap<>(16);
|
||||
knowledgeUpdateCache = new HashMap<>(16);
|
||||
|
||||
header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
||||
|
||||
//连接nacos配置
|
||||
try {
|
||||
configService = NacosFactory.createConfigService(nacosProperties);
|
||||
}catch (NacosException e){
|
||||
logger.error("Get Schema config from Nacos error,The exception message is :{}", e.getMessage());
|
||||
}
|
||||
|
||||
//初始化知识库
|
||||
initKnowledge();
|
||||
logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(SourceContext ctx) throws Exception {
|
||||
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
|
||||
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
|
||||
// List<CustomFile> customFiles = new ArrayList<>();
|
||||
if (StringUtil.isNotBlank(config)) {
|
||||
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
|
||||
loadKnowledge(metaList);
|
||||
if (!knowledgeUpdateCache.isEmpty()){
|
||||
ctx.collect(knowledgeUpdateCache);
|
||||
knowledgeUpdateCache.clear();
|
||||
}
|
||||
// if (StringUtil.isNotBlank(config)) {
|
||||
// List<KnowledgeLog> knowledgeLogListList = jsonMapperInstance.fromJson(config, listType);
|
||||
// if (knowledgeLogListList.size()>=1){
|
||||
// for (KnowledgeLog knowledgeLog : knowledgeLogListList) {
|
||||
// String name = knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat());
|
||||
// String sha256 = knowledgeLog.getSha256();
|
||||
// updateMap.put(name,sha256);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
|
||||
@Override
|
||||
@@ -105,20 +112,22 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
|
||||
logger.info("receive update config:" + configMsg);
|
||||
if (StringUtil.isNotBlank(configMsg)) {
|
||||
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
|
||||
if (metaList.size() >= 1) {
|
||||
if (metaList.size() > 0) {
|
||||
for (Object metadata : metaList) {
|
||||
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
|
||||
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
|
||||
knowledgeJson.getStr("format"));
|
||||
String sha256 = knowledgeJson.getStr("sha256");
|
||||
String filePath = knowledgeJson.getStr("path");
|
||||
if (!sha256.equals(updateMap.get(fileName))) {
|
||||
updateMap.put(fileName, sha256);
|
||||
updateKnowledge(fileName, filePath);
|
||||
if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
|
||||
knowledgeMetaCache.put(fileName, sha256);
|
||||
updateKnowledge(fileName, filePath,sha256);
|
||||
}
|
||||
|
||||
}
|
||||
ctx.collect(knowledgeFileCache);
|
||||
if (!knowledgeUpdateCache.isEmpty()){
|
||||
ctx.collect(knowledgeUpdateCache);
|
||||
knowledgeUpdateCache.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,85 +139,129 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
|
||||
});
|
||||
|
||||
while (isRunning) {
|
||||
Thread.sleep(10000);
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
}catch (InterruptedException e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
// private CustomFile loadKnowledge(String fileName, String filePath) {
|
||||
// InputStream inputStream = null;
|
||||
// FileOutputStream outputStream = null;
|
||||
// CustomFile customFile = new CustomFile();
|
||||
// try {
|
||||
// customFile.setFileName(fileName);
|
||||
// Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
||||
// HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
|
||||
// inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
||||
// FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
|
||||
// File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
|
||||
// outputStream = new FileOutputStream(file);
|
||||
// byte[] bytes = IOUtils.toByteArray(inputStream);
|
||||
// customFile.setContent(bytes);
|
||||
// inputStream = new ByteArrayInputStream(customFile.getContent());
|
||||
// IoUtil.copy(inputStream, outputStream);
|
||||
//
|
||||
// } catch (IOException ioException) {
|
||||
// ioException.printStackTrace();
|
||||
// } finally {
|
||||
// IOUtils.closeQuietly(inputStream);
|
||||
// IOUtils.closeQuietly(outputStream);
|
||||
// }
|
||||
// return customFile;
|
||||
// }
|
||||
private void loadKnowledge(ArrayList<Object> metaList) {
|
||||
InputStream inputStream = null;
|
||||
try {
|
||||
if (metaList.size() >= 1) {
|
||||
for (Object metadata : metaList) {
|
||||
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
|
||||
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
|
||||
knowledgeJson.getStr("format"));
|
||||
String sha256 = knowledgeJson.getStr("sha256");
|
||||
String filePath = knowledgeJson.getStr("path");
|
||||
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
||||
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
|
||||
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
||||
updateMap.put(fileName, sha256);
|
||||
knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
|
||||
|
||||
|
||||
private void initKnowledge(){
|
||||
String configMsg = "";
|
||||
try {
|
||||
configMsg=configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
|
||||
} catch (NacosException e) {
|
||||
logger.error("从Nacos获取知识库元数据配置文件异常,异常信息为:{}", e.getMessage());
|
||||
}
|
||||
|
||||
if (StringUtil.isNotBlank(configMsg)){
|
||||
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
|
||||
if (metaList.size() > 0) {
|
||||
for (Object metadata : metaList) {
|
||||
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
|
||||
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
|
||||
knowledgeJson.getStr("format"));
|
||||
String sha256 = knowledgeJson.getStr("sha256");
|
||||
String filePath = knowledgeJson.getStr("path");
|
||||
byte[] localFileByte = getLocalFile(fileName);
|
||||
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||
if (sha256.equals(localFileSha256Hex)){
|
||||
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
|
||||
knowledgeMetaCache.put(fileName, sha256);
|
||||
}else {
|
||||
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
|
||||
updateKnowledge(fileName,filePath,sha256);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException ioException) {
|
||||
ioException.printStackTrace();
|
||||
} finally {
|
||||
IOUtils.closeQuietly(inputStream);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void updateKnowledge(String fileName, String filePath) {
|
||||
|
||||
private void updateKnowledge(String fileName, String filePath,String sha256) {
|
||||
InputStream inputStream = null;
|
||||
FileOutputStream outputStream = null;
|
||||
int retryNum = 0;
|
||||
try {
|
||||
while (retryNum < TRY_TIMES){
|
||||
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
||||
if (inputStream !=null){
|
||||
byte[] downloadBytes = IOUtils.toByteArray(inputStream);
|
||||
String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
|
||||
if (sha256.equals(downloadFileSha256Hex)&& downloadBytes.length > 0 ){
|
||||
logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
|
||||
boolean updateStatus = updateLocalFile(fileName, downloadBytes);
|
||||
if (updateStatus){
|
||||
knowledgeMetaCache.put(fileName,sha256);
|
||||
knowledgeUpdateCache.put(fileName, sha256);
|
||||
retryNum = TRY_TIMES;
|
||||
}else {
|
||||
retryNum++;
|
||||
//避免频繁请求HOS
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
// isSending = true;
|
||||
}else {
|
||||
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
|
||||
retryNum++;
|
||||
//避免频繁请求HOS
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (IOException ioException) {
|
||||
ioException.printStackTrace();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
IOUtils.closeQuietly(inputStream);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private boolean updateLocalFile(String fileName,byte[] downloadBytes) {
|
||||
FileOutputStream outputStream = null;
|
||||
boolean updateStatus = false;
|
||||
try {
|
||||
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
|
||||
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
|
||||
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
|
||||
FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
|
||||
File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
|
||||
outputStream = new FileOutputStream(file);
|
||||
byte[] bytes = IOUtils.toByteArray(inputStream);
|
||||
knowledgeFileCache.put(fileName, bytes);
|
||||
inputStream=new ByteArrayInputStream(bytes);
|
||||
IoUtil.copy(inputStream, outputStream);
|
||||
} catch (IOException ioException) {
|
||||
ioException.printStackTrace();
|
||||
IoUtil.copy(new ByteArrayInputStream(downloadBytes), outputStream);
|
||||
updateStatus=true;
|
||||
} catch (IOException ioe) {
|
||||
logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
|
||||
ioe.printStackTrace();
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage());
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
IOUtils.closeQuietly(inputStream);
|
||||
IOUtils.closeQuietly(outputStream);
|
||||
}
|
||||
return updateStatus;
|
||||
}
|
||||
|
||||
private static byte[] getLocalFile(String name) {
|
||||
byte[] fileBytes = null;
|
||||
try {
|
||||
fileBytes=new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes();
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
return fileBytes;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
this.isRunning = false;
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
package com.zdjizhi.utils;
|
||||
|
||||
import cn.hutool.core.io.file.FileReader;
|
||||
import cn.hutool.crypto.digest.DigestUtil;
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.CustomFile;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -86,20 +89,126 @@ public class IpUtils {
|
||||
}
|
||||
}
|
||||
|
||||
public static void updateIpLook(Map<String, byte[]> knowledgeFileCache){
|
||||
public static void updateIpLook(Map<String, String> knowledgeFileCache){
|
||||
try{
|
||||
IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
|
||||
ipLookup= builder.loadDataFileV4(new ByteArrayInputStream(knowledgeFileCache.get("ip_v4_built_in.mmdb")))
|
||||
.loadDataFileV6(new ByteArrayInputStream(knowledgeFileCache.get("ip_v6_built_in.mmdb")))
|
||||
.loadDataFilePrivateV4(new ByteArrayInputStream(knowledgeFileCache.get("ip_v4_user_defined.mmdb")))
|
||||
.loadDataFilePrivateV6(new ByteArrayInputStream(knowledgeFileCache.get("ip_v6_user_defined.mmdb")))
|
||||
.build();
|
||||
if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)) {
|
||||
byte[] ipv4BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_built_in.mmdb");
|
||||
if (ipv4BuiltBytes!=null){
|
||||
if (knowledgeFileCache.containsKey("ip_v4_built_in.mmdb")){
|
||||
String sha256 = knowledgeFileCache.get("ip_v4_built_in.mmdb");
|
||||
byte[] localFileByte = getLocalFile("ip_v4_built_in.mmdb");
|
||||
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||
if (sha256.equals(localFileSha256Hex)){
|
||||
builder.loadDataFileV4(new ByteArrayInputStream(ipv4BuiltBytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
byte[] ipv6BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_built_in.mmdb");
|
||||
if (ipv6BuiltBytes!=null){
|
||||
if (knowledgeFileCache.containsKey("ip_v6_built_in.mmdb")) {
|
||||
String sha256 = knowledgeFileCache.get("ip_v6_built_in.mmdb");
|
||||
byte[] localFileByte = getLocalFile("ip_v6_built_in.mmdb");
|
||||
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||
if (sha256.equals(localFileSha256Hex)) {
|
||||
builder.loadDataFileV6(new ByteArrayInputStream(ipv6BuiltBytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
byte[] ipv4UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_user_defined.mmdb");
|
||||
if (ipv4UserBytes!=null){
|
||||
if (knowledgeFileCache.containsKey("ip_v4_user_defined.mmdb")) {
|
||||
String sha256 = knowledgeFileCache.get("ip_v4_user_defined.mmdb");
|
||||
byte[] localFileByte = getLocalFile("ip_v4_user_defined.mmdb");
|
||||
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||
if (sha256.equals(localFileSha256Hex)) {
|
||||
builder.loadDataFilePrivateV4(new ByteArrayInputStream(ipv4UserBytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
byte[] ipv6UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_user_defined.mmdb");
|
||||
if (ipv6UserBytes!=null){
|
||||
if (knowledgeFileCache.containsKey("ip_v6_user_defined.mmdb")) {
|
||||
String sha256 = knowledgeFileCache.get("ip_v6_user_defined.mmdb");
|
||||
byte[] localFileByte = getLocalFile("ip_v6_user_defined.mmdb");
|
||||
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||
if (sha256.equals(localFileSha256Hex)) {
|
||||
builder.loadDataFilePrivateV6(new ByteArrayInputStream(ipv6UserBytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
}else if ("SINGLE".equals(CommonConfig.CLUSTER_OR_SINGLE)){
|
||||
byte[] ipv4BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_built_in.mmdb");
|
||||
if (ipv4BuiltBytes!=null){
|
||||
if (knowledgeFileCache.containsKey("ip_v4_built_in.mmdb")){
|
||||
String sha256 = knowledgeFileCache.get("ip_v4_built_in.mmdb");
|
||||
byte[] localFileByte = getLocalFile("ip_v4_built_in.mmdb");
|
||||
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||
if (sha256.equals(localFileSha256Hex)){
|
||||
builder.loadDataFileV4(new ByteArrayInputStream(ipv4BuiltBytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
byte[] ipv6BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_built_in.mmdb");
|
||||
if (ipv6BuiltBytes!=null){
|
||||
if (knowledgeFileCache.containsKey("ip_v6_built_in.mmdb")) {
|
||||
String sha256 = knowledgeFileCache.get("ip_v6_built_in.mmdb");
|
||||
byte[] localFileByte = getLocalFile("ip_v6_built_in.mmdb");
|
||||
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||
if (sha256.equals(localFileSha256Hex)) {
|
||||
builder.loadDataFileV6(new ByteArrayInputStream(ipv6BuiltBytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
byte[] ipv4UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_user_defined.mmdb");
|
||||
if (ipv4UserBytes!=null){
|
||||
if (knowledgeFileCache.containsKey("ip_v4_user_defined.mmdb")) {
|
||||
String sha256 = knowledgeFileCache.get("ip_v4_user_defined.mmdb");
|
||||
byte[] localFileByte = getLocalFile("ip_v4_user_defined.mmdb");
|
||||
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||
if (sha256.equals(localFileSha256Hex)) {
|
||||
builder.loadDataFilePrivateV4(new ByteArrayInputStream(ipv4UserBytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
byte[] ipv6UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_user_defined.mmdb");
|
||||
if (ipv6UserBytes!=null){
|
||||
if (knowledgeFileCache.containsKey("ip_v6_user_defined.mmdb")) {
|
||||
String sha256 = knowledgeFileCache.get("ip_v6_user_defined.mmdb");
|
||||
byte[] localFileByte = getLocalFile("ip_v6_user_defined.mmdb");
|
||||
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
|
||||
if (sha256.equals(localFileSha256Hex)) {
|
||||
builder.loadDataFilePrivateV6(new ByteArrayInputStream(ipv6UserBytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ipLookup = builder.build();
|
||||
}catch (Exception e){
|
||||
LOG.error("加载失败",e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static byte[] getLocalFile(String name) {
|
||||
byte[] fileBytes = null;
|
||||
try {
|
||||
fileBytes = "CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE) ?
|
||||
HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) :
|
||||
new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes();
|
||||
} catch (RuntimeException | IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return fileBytes;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println(ipLookup.countryLookup("49.7.115.37"));
|
||||
|
||||
|
||||
8
src/test/java/com/zdjizhi/ThresholdTestData/DNS-Flood
Normal file
8
src/test/java/com/zdjizhi/ThresholdTestData/DNS-Flood
Normal file
@@ -0,0 +1,8 @@
|
||||
--DosSketchLog
|
||||
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277141, sketch_duration=59, attack_type='DNS Flood', source_ip='23.91.128.115', destination_ip='102.219.30.33', sketch_sessions=945, sketch_packets=945, sketch_bytes=446370, vsys_id=23}
|
||||
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277205, sketch_duration=86, attack_type='DNS Flood', source_ip='172.217.160.68', destination_ip='10.113.83.88', sketch_sessions=730, sketch_packets=730, sketch_bytes=344575, vsys_id=1}
|
||||
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277244, sketch_duration=47, attack_type='DNS Flood', source_ip='45.135.144.112', destination_ip='42.62.192.132', sketch_sessions=0, sketch_packets=0, sketch_bytes=47, vsys_id=1}
|
||||
|
||||
--DosDetectionThreshold
|
||||
{profileId='6091', attackType='DNS Flood', serverIpList=[113.113.83.213, 42.62.192.132/28, 10.113.83.1/25, 102.219.30.33/29], serverIpAddr='null', packetsPerSec=1, bitsPerSec=1, sessionsPerSec=1, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}
|
||||
{profileId='5679', attackType='DNS Flood', serverIpList=[102.219.30.33], serverIpAddr='null', packetsPerSec=500, bitsPerSec=1000000, sessionsPerSec=100000, isValid=1, vsysId=23, superiorIds=[4, 5]}
|
||||
6
src/test/java/com/zdjizhi/ThresholdTestData/ICMP-Flood
Normal file
6
src/test/java/com/zdjizhi/ThresholdTestData/ICMP-Flood
Normal file
@@ -0,0 +1,6 @@
|
||||
--DosSketchLog
|
||||
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277232, sketch_duration=59, attack_type='ICMP Flood', source_ip='45.170.244.25', destination_ip='24.152.57.56', sketch_sessions=499, sketch_packets=499, sketch_bytes=111970, vsys_id=1}
|
||||
|
||||
|
||||
--DosDetectionThreshold
|
||||
{profileId='6093', attackType='ICMP Flood', serverIpList=[31.131.80.88/29, 24.152.57.56/29, 47.93.59.1/25], serverIpAddr='null', packetsPerSec=210, bitsPerSec=0, sessionsPerSec=0, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}
|
||||
@@ -0,0 +1,7 @@
|
||||
--DosSketchLog
|
||||
{common_sled_ip='null', common_data_center='null', sketch_start_time=1685003938, sketch_duration=63714, attack_type='TCP SYN Flood', source_ip='5.32.144.55', destination_ip='45.188.134.11', sketch_sessions=0, sketch_packets=0, sketch_bytes=4195, vsys_id=1}
|
||||
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277234, sketch_duration=57, attack_type='TCP SYN Flood', source_ip='18.65.148.128', destination_ip='23.200.74.224', sketch_sessions=54, sketch_packets=54, sketch_bytes=73427, vsys_id=1}
|
||||
|
||||
|
||||
--DosDetectionThreshold
|
||||
{profileId='6095', attackType='TCP SYN Flood', serverIpList=[23.200.74.224, 45.188.134.11/29, 41.183.0.15/29, 41.183.0.16/30], serverIpAddr='null', packetsPerSec=1, bitsPerSec=1, sessionsPerSec=1, isValid=1, vsysId=1, superiorIds=[5, 4, 12, 27]}
|
||||
8
src/test/java/com/zdjizhi/ThresholdTestData/UDP-Flood
Normal file
8
src/test/java/com/zdjizhi/ThresholdTestData/UDP-Flood
Normal file
@@ -0,0 +1,8 @@
|
||||
--DosSketchLog
|
||||
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277291, sketch_duration=0, attack_type='UDP Flood', source_ip='121.14.89.209', destination_ip='192.168.50.11', sketch_sessions=0, sketch_packets=0, sketch_bytes=0, vsys_id=1}
|
||||
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277233, sketch_duration=58, attack_type='UDP Flood', source_ip='192.168.50.56,192.168.50.34,192.168.50.11,192.168.50.33,192.168.50.55,192.168.50.58,192.168.50.36,192.168.50.14,192.168.50.35,192.168.50.13,192.168.50.57,192.168.50.30,192.168.50.51,192.168.50.54,192.168.50.10,192.168.50.32,192.168.50.53,192.168.50.31,192.168.50.16,192.168.50.38,192.168.50.15,192.168.50.37,192.168.50.18,192.168.50.17,192.168.50.50,192.168.50.45,192.168.50.23,192.168.50.22,192.168.50.44,192.168.50.25,192.168.50.47,192.168.50.46,192.168.50.24,192.168.50.63,192.168.50.41,192.168.50.40,192.168.50.62,192.168.50.43,192.168.50.21,192.168.50.20,192.168.50.42,192.168.50.27,192.168.50.26,192.168.50.48,192.168.50.28,192.168.50.61,192.168.50.60', destination_ip='121.14.89.209', sketch_sessions=297, sketch_packets=297, sketch_bytes=371404, vsys_id=1}
|
||||
|
||||
|
||||
--DosDetectionThreshold
|
||||
{profileId='5333', attackType='UDP Flood', serverIpList=[192.168.50.11, 192.168.50.12], serverIpAddr='null', packetsPerSec=50, bitsPerSec=50, sessionsPerSec=50, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}
|
||||
|
||||
237
src/test/java/com/zdjizhi/etl/DosDetectionTest.java
Normal file
237
src/test/java/com/zdjizhi/etl/DosDetectionTest.java
Normal file
@@ -0,0 +1,237 @@
|
||||
package com.zdjizhi.etl;
|
||||
|
||||
import com.zdjizhi.common.DosDetectionThreshold;
|
||||
import com.zdjizhi.common.DosEventLog;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import com.zdjizhi.utils.IpUtils;
|
||||
import com.zdjizhi.utils.NacosUtils;
|
||||
import com.zdjizhi.utils.SnowflakeId;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.text.NumberFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
|
||||
public class DosDetectionTest {
|
||||
|
||||
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
|
||||
private final static int BASELINE_SIZE = 144;
|
||||
private final static int STATIC_CONDITION_TYPE = 1;
|
||||
private final static int BASELINE_CONDITION_TYPE = 2;
|
||||
private final static int SENSITIVITY_CONDITION_TYPE = 3;
|
||||
|
||||
private final static String SESSIONS_TAG = "sessions";
|
||||
private final static String PACKETS_TAG = "packets";
|
||||
private final static String BITS_TAG = "bits";
|
||||
@Test
|
||||
public void dosDetectionTest(){
|
||||
DosDetectionThreshold dosDetectionThreshold = new DosDetectionThreshold();
|
||||
ArrayList<String> serverIpList = new ArrayList<>();
|
||||
serverIpList.add("192.168.50.11");
|
||||
serverIpList.add("192.168.50.1/24");
|
||||
serverIpList.add("FC::12:0:0/54");
|
||||
serverIpList.add("FC::12:0:0");
|
||||
dosDetectionThreshold.setProfileId(4437);
|
||||
dosDetectionThreshold.setAttackType("DNS Flood");
|
||||
dosDetectionThreshold.setServerIpList(serverIpList);
|
||||
dosDetectionThreshold.setSessionsPerSec(1);
|
||||
dosDetectionThreshold.setPacketsPerSec(1);
|
||||
dosDetectionThreshold.setBitsPerSec(100000);
|
||||
dosDetectionThreshold.setIsValid(1);
|
||||
dosDetectionThreshold.setSuperiorIds(new Integer[]{5,4,12,27});
|
||||
|
||||
|
||||
DosSketchLog dosSketchLog = new DosSketchLog ();
|
||||
|
||||
dosSketchLog.setSketch_sessions(68);
|
||||
dosSketchLog.setSketch_packets(68);
|
||||
dosSketchLog.setSketch_bytes(285820);//185.82
|
||||
dosSketchLog.setVsys_id(1);
|
||||
dosSketchLog.setAttack_type("ICMP Flood");
|
||||
dosSketchLog.setSource_ip("45.170.244.25");
|
||||
dosSketchLog.setDestination_ip("24.152.57.56");
|
||||
//静态阈值获取
|
||||
long sessionBase = dosDetectionThreshold.getSessionsPerSec();
|
||||
long pktBase=dosDetectionThreshold.getPacketsPerSec();
|
||||
long bitBase=dosDetectionThreshold.getBitsPerSec();
|
||||
//基于速率进行计算
|
||||
long diffSession = dosSketchLog.getSketch_sessions() - sessionBase;
|
||||
long diffPkt = dosSketchLog.getSketch_packets() - pktBase;
|
||||
long diffByte = dosSketchLog.getSketch_bytes() - bitBase;
|
||||
|
||||
|
||||
Double diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100;
|
||||
Double diffPktPercent = getDiffPercent(diffPkt, pktBase)*100;
|
||||
Double diffBitPercent = getDiffPercent(diffByte, bitBase)*100;
|
||||
long profileId = 0;
|
||||
DosEventLog result =null;
|
||||
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){
|
||||
profileId = dosDetectionThreshold.getProfileId();
|
||||
result= getDosEventLog(dosSketchLog, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
|
||||
System.out.println(result);
|
||||
}else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){
|
||||
profileId = dosDetectionThreshold.getProfileId();
|
||||
result = getDosEventLog(dosSketchLog, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
|
||||
System.out.println(result);
|
||||
}else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){
|
||||
profileId = dosDetectionThreshold.getProfileId();
|
||||
result = getDosEventLog(dosSketchLog, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
|
||||
System.out.println(result);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {
|
||||
DosEventLog result = null;
|
||||
String destinationIp = value.getDestination_ip();
|
||||
String attackType = value.getAttack_type();
|
||||
if (diff > 0 && base != 0) {
|
||||
double percent = getDiffPercent(diff, base);
|
||||
Severity severity = judgeSeverity(percent);
|
||||
Integer staticSensitivityThreshold = 100;
|
||||
if (severity != Severity.NORMAL) {
|
||||
if (type == BASELINE_CONDITION_TYPE && percent < 0.2) {
|
||||
// logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
|
||||
}else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold){
|
||||
// logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}",destinationIp, attackType, base, percent, value);
|
||||
}else {
|
||||
result = getResult(value, base, profileId, severity, percent+1, type, tag);
|
||||
if (type == SENSITIVITY_CONDITION_TYPE){
|
||||
result.setSeverity(Severity.MAJOR.severity);
|
||||
}
|
||||
// logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
|
||||
}
|
||||
}
|
||||
// else {
|
||||
// logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
|
||||
// }
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) {
|
||||
DosEventLog dosEventLog = new DosEventLog();
|
||||
// dosEventLog.setLog_id(SnowflakeId.generateId());
|
||||
dosEventLog.setVsys_id(value.getVsys_id());
|
||||
dosEventLog.setStart_time(value.getSketch_start_time());
|
||||
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
|
||||
dosEventLog.setProfile_id(profileId);
|
||||
dosEventLog.setAttack_type(value.getAttack_type());
|
||||
dosEventLog.setSeverity(severity.severity);
|
||||
// dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
|
||||
dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog));
|
||||
dosEventLog.setDestination_ip(value.getDestination_ip());
|
||||
// dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
|
||||
String ipList = value.getSource_ip();
|
||||
dosEventLog.setSource_ip_list(ipList);
|
||||
dosEventLog.setSource_country_list(getSourceCountryList(ipList));
|
||||
dosEventLog.setSession_rate(value.getSketch_sessions());
|
||||
dosEventLog.setPacket_rate(value.getSketch_packets());
|
||||
dosEventLog.setBit_rate(value.getSketch_bytes());
|
||||
return dosEventLog;
|
||||
}
|
||||
|
||||
public String getConditions(double percent, long base, long sessions, int type, String tag,DosEventLog dosEventLog) {
|
||||
int condition =0;
|
||||
if ("Minor".equals(dosEventLog.getSeverity())){
|
||||
condition=50;
|
||||
}else if ("Warning".equals(dosEventLog.getSeverity())){
|
||||
condition=100;
|
||||
}else if ("Major".equals(dosEventLog.getSeverity())){
|
||||
condition=250;
|
||||
}else if ("Severe".equals(dosEventLog.getSeverity())){
|
||||
condition=500;
|
||||
}else if ("Critical".equals(dosEventLog.getSeverity())){
|
||||
condition =800;
|
||||
}
|
||||
switch (type) {
|
||||
case STATIC_CONDITION_TYPE:
|
||||
return "Rate > " +
|
||||
base + " " +
|
||||
tag + "/s" + "(>"+condition+"%)";
|
||||
case BASELINE_CONDITION_TYPE:
|
||||
return tag + " > " +
|
||||
PERCENT_INSTANCE.format(percent) + " of baseline";
|
||||
case SENSITIVITY_CONDITION_TYPE:
|
||||
return String.valueOf(sessions) + " " +
|
||||
tag + "/s Unusually high " +
|
||||
StringUtils.capitalize(tag);
|
||||
default:
|
||||
throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private String getSourceCountryList(String sourceIpList) {
|
||||
if (StringUtil.isNotBlank(sourceIpList)) {
|
||||
String countryList;
|
||||
try {
|
||||
String[] ipArr = sourceIpList.split(",");
|
||||
HashSet<String> countrySet = new HashSet<>();
|
||||
for (String ip : ipArr) {
|
||||
String country = IpUtils.ipLookup.countryLookup(ip);
|
||||
if (StringUtil.isNotBlank(country)){
|
||||
countrySet.add(country);
|
||||
}
|
||||
}
|
||||
countryList = StringUtils.join(countrySet, ", ");
|
||||
return countryList;
|
||||
} catch (Exception e) {
|
||||
// logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
|
||||
return StringUtil.EMPTY;
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("Illegal Argument sourceIpList = null");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private Double getDiffPercent(long diff, long base) {
|
||||
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
|
||||
}
|
||||
|
||||
private Severity judgeSeverity(double diffPercent) {
|
||||
if (diffPercent >= 0.5 && diffPercent < 1) {
|
||||
return Severity.MINOR;
|
||||
} else if (diffPercent >= 1 && diffPercent < 2.5) {
|
||||
return Severity.WARNING;
|
||||
} else if (diffPercent >= 2.5 && diffPercent < 5) {
|
||||
return Severity.MAJOR;
|
||||
} else if (diffPercent >= 5 && diffPercent < 8) {
|
||||
return Severity.SEVERE;
|
||||
} else if (diffPercent >= 8) {
|
||||
return Severity.CRITICAL;
|
||||
} else {
|
||||
return Severity.NORMAL;
|
||||
}
|
||||
}
|
||||
|
||||
private enum Severity {
|
||||
/**
|
||||
* 判断严重程度枚举类型
|
||||
*/
|
||||
CRITICAL("Critical"),
|
||||
SEVERE("Severe"),
|
||||
MAJOR("Major"),
|
||||
WARNING("Warning"),
|
||||
MINOR("Minor"),
|
||||
NORMAL("Normal");
|
||||
|
||||
private final String severity;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.severity;
|
||||
}
|
||||
|
||||
Severity(String severity) {
|
||||
this.severity = severity;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
11
src/test/java/com/zdjizhi/etl/EtlProcessFunctionTest.java
Normal file
11
src/test/java/com/zdjizhi/etl/EtlProcessFunctionTest.java
Normal file
@@ -0,0 +1,11 @@
|
||||
package com.zdjizhi.etl;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class EtlProcessFunctionTest {
|
||||
|
||||
@Test
|
||||
public void EtlProcessFunctionTest(){
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user