13 Commits

20 changed files with 831 additions and 229 deletions

25
pom.xml
View File

@@ -20,7 +20,7 @@
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.125:8099/content/groups/public</url>
<url>http://192.168.40.153:8099/content/groups/public</url>
</repository>
<repository>
@@ -184,15 +184,19 @@
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-hdfs</artifactId>-->
<!-- <version>${hadoop.version}</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
@@ -257,7 +261,7 @@
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.1.1</version>
<version>1.1.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@@ -274,6 +278,13 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.32</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>

View File

@@ -9,7 +9,7 @@ import java.util.Objects;
* @author wlh
*/
public class DosDetectionThreshold implements Serializable {
private String profileId;
private long profileId;
private String attackType;
private ArrayList<String> serverIpList;
private String serverIpAddr;
@@ -20,11 +20,11 @@ public class DosDetectionThreshold implements Serializable {
private int vsysId;
private Integer[] superiorIds;
public String getProfileId() {
public long getProfileId() {
return profileId;
}
public void setProfileId(String profileId) {
public void setProfileId(long profileId) {
this.profileId = profileId;
}

View File

@@ -8,6 +8,7 @@ public class DosEventLog implements Serializable,Cloneable {
private int vsys_id;
private long start_time;
private long end_time;
private long profile_id;
private String attack_type;
private String severity;
private String conditions;
@@ -51,6 +52,14 @@ public class DosEventLog implements Serializable,Cloneable {
this.end_time = end_time;
}
public long getProfile_id() {
return profile_id;
}
public void setProfile_id(long profile_id) {
this.profile_id = profile_id;
}
public String getAttack_type() {
return attack_type;
}
@@ -138,6 +147,7 @@ public class DosEventLog implements Serializable,Cloneable {
", vsys_id=" + vsys_id +
", start_time=" + start_time +
", end_time=" + end_time +
", profile_id=" + profile_id +
", attack_type='" + attack_type + '\'' +
", severity='" + severity + '\'' +
", conditions='" + conditions + '\'' +

View File

@@ -1,5 +1,6 @@
package com.zdjizhi.etl;
import cn.hutool.core.math.MathUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.*;
@@ -23,7 +24,7 @@ import java.util.concurrent.TimeUnit;
/**
* @author wlh
*/
public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<String, byte[]>, DosEventLog> {
public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<String, String>, DosEventLog> {
private static final Log logger = LogFactory.get();
private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
@@ -99,15 +100,17 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
}
@Override
public void processBroadcastElement(Map<String, byte[]> value, Context ctx, Collector<DosEventLog> out) throws Exception {
IpUtils.updateIpLook(value);
public void processBroadcastElement(Map<String, String> value, Context ctx, Collector<DosEventLog> out) throws Exception {
if (!value.isEmpty()){
IpUtils.updateIpLook(value);
}
}
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
long sketchSessions = value.getSketch_sessions();
Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold");
long diff = sketchSessions - staticSensitivityThreshold;
return getDosEventLog(value, staticSensitivityThreshold, diff, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
return getDosEventLog(value, staticSensitivityThreshold, diff,0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
}
private DosEventLog getDosEventLogByBaseline(DosSketchLog value,String key) {
@@ -116,42 +119,52 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType);
Integer base = getBaseValue(dosBaselineThreshold, value);
long diff = sketchSessions - base;
return getDosEventLog(value, base, diff, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
return getDosEventLog(value, base, diff, 0,BASELINE_CONDITION_TYPE, SESSIONS_TAG);
}
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) throws CloneNotSupportedException {
long base = threshold.getSessionsPerSec();
long diff = value.getSketch_sessions() - base;
DosEventLog result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, SESSIONS_TAG);
if (result == null) {
base = threshold.getPacketsPerSec();
diff = value.getSketch_packets() - base;
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, PACKETS_TAG);
if (result == null) {
base = threshold.getBitsPerSec();
diff = value.getSketch_bytes() - base;
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, BITS_TAG);
}
long sessionBase = threshold.getSessionsPerSec();
long pktBase=threshold.getPacketsPerSec();
long bitBase=threshold.getBitsPerSec();
long diffSession = value.getSketch_sessions() - sessionBase;
long diffPkt = value.getSketch_packets() - pktBase;
long diffByte = value.getSketch_bytes() - bitBase;
// Double diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100;
// Double diffPktPercent = getDiffPercent(diffPkt, pktBase)*100;
// Double diffBitPercent = getDiffPercent(diffByte, bitBase)*100;
double diffSessionPercent=0.0;
double diffPktPercent=0.0;
double diffBitPercent=0.0;
if (sessionBase != 0 && sessionBase > 0){
diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100;
}
/*
ArrayList<DosEventLog> dosEventLogs = new ArrayList<>();
if (result != null){
dosEventLogs.add(result);
Integer[] superiorIds = threshold.getSuperiorIds();
if (superiorIds != null && superiorIds.length > 0){
for (Integer integer:superiorIds){
DosEventLog clone = (DosEventLog) result.clone();
clone.setVsys_id(integer);
clone.setLog_id(SnowflakeId.generateId());
dosEventLogs.add(clone);
}
}
else if (pktBase != 0 && pktBase > 0){
diffPktPercent = getDiffPercent(diffPkt, pktBase)*100;
}
else if (bitBase != 0 && bitBase > 0){
diffBitPercent = getDiffPercent(diffByte, bitBase)*100;
}
long profileId = 0;
DosEventLog result =null;
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){
profileId = threshold.getProfileId();
result= getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
}else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){
profileId = threshold.getProfileId();
result = getDosEventLog(value, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
}else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){
profileId = threshold.getProfileId();
result = getDosEventLog(value, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
}
*/
return result;
}
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, int type, String tag) {
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
@@ -165,7 +178,8 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
}else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold){
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}",destinationIp, attackType, base, percent, value);
}else {
result = getResult(value, base, severity, percent+1, type, tag);
// result = getResult(value, base, profileId, severity, percent+1, type, tag);
result = getResult(value, base, profileId, severity, percent, type, tag);
if (type == SENSITIVITY_CONDITION_TYPE){
result.setSeverity(Severity.MAJOR.severity);
}
@@ -178,15 +192,17 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
return result;
}
private DosEventLog getResult(DosSketchLog value, long base, Severity severity, double percent, int type, String tag) {
private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) {
DosEventLog dosEventLog = new DosEventLog();
dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setVsys_id(value.getVsys_id());
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
dosEventLog.setProfile_id(profileId);
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag,dosEventLog));
// dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog));
dosEventLog.setDestination_ip(value.getDestination_ip());
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
@@ -223,12 +239,24 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
return base;
}
private String getConditions(String percent, long base, long sessions, int type, String tag) {
private String getConditions(String percent, long base, long sessions, int type, String tag,DosEventLog dosEventLog) {
int condition =0;
if ("Minor".equals(dosEventLog.getSeverity())){
condition=50;
}else if ("Warning".equals(dosEventLog.getSeverity())){
condition=100;
}else if ("Major".equals(dosEventLog.getSeverity())){
condition=250;
}else if ("Severe".equals(dosEventLog.getSeverity())){
condition=500;
}else if ("Critical".equals(dosEventLog.getSeverity())){
condition =800;
}
switch (type) {
case STATIC_CONDITION_TYPE:
return "Rate > " +
base + " " +
tag + "/s";
tag + "/s" + "(>"+condition+"%)";
case BASELINE_CONDITION_TYPE:
return tag + " > " +
percent + " of baseline";
@@ -285,7 +313,13 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
}
private Double getDiffPercent(long diff, long base) {
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
try {
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
}catch (Exception e){
logger.info("当前阈值为0,进行下一阈值条件判断",e);
return 0.0;
}
}
private Severity judgeSeverity(double diffPercent) {

View File

@@ -38,6 +38,8 @@ public class ParseBaselineThreshold {
config.set("hbase.client.retries.number", "3");
config.set("hbase.bulkload.retries.number", "3");
config.set("zookeeper.recovery.retry", "3");
config.set("hbase.defaults.for.version", "2.2.3");
config.set("hbase.defaults.for.version.skip", "true");
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);

View File

@@ -1,11 +1,12 @@
package com.zdjizhi.etl;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.source.DosSketchSource;
import com.zdjizhi.utils.FlinkEnvironmentUtils;
import com.zdjizhi.utils.JsonMapper;
//import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -23,9 +24,9 @@ import java.util.*;
public class ParseSketchLog {
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
// private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
// private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
@@ -47,12 +48,15 @@ public class ParseSketchLog {
public void flatMap(String s, Collector<DosSketchLog> collector) {
try {
if (StringUtil.isNotBlank(s)){
HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
HashMap<String, Object> sketchSource = JSONObject.parseObject(s, HashMap.class);
// HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
String attackType = sketchSource.get("attack_type").toString();
int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString());
ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list"));
ArrayList<HashMap<String, Object>> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class);
// ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
for (HashMap<String, Object> obj : reportIpList) {
DosSketchLog dosSketchLog = new DosSketchLog();
dosSketchLog.setSketch_start_time(sketchStartTime);

View File

@@ -2,12 +2,14 @@ package com.zdjizhi.etl;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.fasterxml.jackson.databind.JavaType;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
//import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosDetectionThreshold;
import com.zdjizhi.common.DosVsysId;
import com.zdjizhi.utils.HttpClientUtils;
import com.zdjizhi.utils.JsonMapper;
//import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.NacosUtils;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
@@ -19,6 +21,7 @@ import org.apache.http.message.BasicHeader;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@@ -29,10 +32,10 @@ public class ParseStaticThreshold {
private static final Log logger = LogFactory.get();
private static String encryptpwd;
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class);
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
// private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
// private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
// private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class);
static {
//加载加密登录密码
@@ -51,11 +54,14 @@ public class ParseStaticThreshold {
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms);
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build());
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
psw = data.get("encryptpwd").toString();
} else {
logger.error(msg);
@@ -85,11 +91,13 @@ public class ParseStaticThreshold {
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms);
String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
token = data.get("token").toString();
} else {
logger.error(msg);
@@ -122,14 +130,21 @@ public class ParseStaticThreshold {
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
Object list = data.get("list");
if (list != null) {
vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
String s = JSONObject.toJSONString(list);
List<DosVsysId> dosVsysIds = JSON.parseArray(JSONObject.toJSONString(list), DosVsysId.class);
// vsysIdList= JSONObject.parseObject(JSONObject.toJSONString(list), DosVsysId.class);
vsysIdList= (ArrayList)dosVsysIds;
// vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
logger.info("获取到vsysId {}条", vsysIdList.size());
} else {
logger.warn("vsysIdList为空");
@@ -171,14 +186,19 @@ public class ParseStaticThreshold {
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr,HashMap.class);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
Object list = data.get("list");
if (list != null) {
ArrayList<DosDetectionThreshold> thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
// ArrayList<DosDetectionThreshold> thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
// ArrayList<DosDetectionThreshold> thresholds = JSONObject.parseObject(JSONObject.toJSONString(list), ArrayList.class);
List<DosDetectionThreshold> dosDetectionThresholds = JSON.parseArray(JSONObject.toJSONString(list), DosDetectionThreshold.class);
ArrayList<DosDetectionThreshold> thresholds = (ArrayList)dosDetectionThresholds;
for (DosDetectionThreshold dosDetectionThreshold : thresholds) {
dosDetectionThreshold.setSuperiorIds(superiorIds);
vsysThresholds.add(dosDetectionThreshold);

View File

@@ -1,8 +1,9 @@
package com.zdjizhi.sink;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.utils.JsonMapper;
//import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.KafkaUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -13,7 +14,8 @@ class DosEventSink {
static void dosEventOutputSink(SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream){
dosEventLogOutputStream
.filter(Objects::nonNull)
.map(JsonMapper::toJsonString)
// .map(JsonMapper::toJsonString)
.map(JSONObject::toJSONString)
.addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
.setParallelism(CommonConfig.KAFKA_OUTPUT_EVENT_PARALLELISM);
}

View File

@@ -29,7 +29,7 @@ import java.util.Properties;
* @author 94976
*/
public class OutputStreamSink {
// private static final Logger logger = LoggerFactory.getLogger(OutputStreamSink.class);
// private static final Logger logger = LoggerFactory.getLogger(OutputStreamSink.class);
private static final Log logger = LogFactory.get();
public static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics"){};
@@ -46,7 +46,7 @@ public class OutputStreamSink {
}
private static SingleOutputStreamOperator<DosEventLog> getEventSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
DataStreamSource<Map<String, byte[]>> broadcastSource=null;
DataStreamSource<Map<String, String>> broadcastSource=null;
Properties nacosProperties = new Properties();
nacosProperties.put(PropertyKeyConst.SERVER_ADDR,CommonConfig.NACOS_SERVER_ADDR);
@@ -55,7 +55,7 @@ public class OutputStreamSink {
nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, CommonConfig.NACOS_NAMESPACE);
if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)){
broadcastSource = DosSketchSource.broadcastSource(nacosProperties,CommonConfig.HDFS_PATH);
broadcastSource = DosSketchSource.broadcastSource(nacosProperties);
}else {
broadcastSource= DosSketchSource.singleBroadcastSource(nacosProperties);
}
@@ -63,7 +63,7 @@ public class OutputStreamSink {
MapStateDescriptor<String,Map> descriptor =
new MapStateDescriptor<>("descriptorTest", Types.STRING, TypeInformation.of(Map.class));
BroadcastStream<Map<String, byte[]>> broadcast = broadcastSource.broadcast(descriptor);
BroadcastStream<Map<String, String>> broadcast = broadcastSource.broadcast(descriptor);
return middleStream
.connect(broadcast)

View File

@@ -1,5 +1,6 @@
package com.zdjizhi.sink;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosMetricsLog;
import com.zdjizhi.common.DosSketchLog;
@@ -14,7 +15,8 @@ class TrafficServerIpMetricsSink {
static void sideOutputMetricsSink(SingleOutputStreamOperator<DosSketchLog> outputStream){
DataStream<DosMetricsLog> sideOutput = outputStream.getSideOutput(outputTag);
sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
// sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
sideOutput.map(JSONObject::toJSONString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
.setParallelism(CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM);

View File

@@ -36,11 +36,11 @@ public class DosSketchSource {
}
public static DataStreamSource<Map<String, byte[]>> broadcastSource(Properties nacosProperties, String STORE_PATH){
return streamExeEnv.addSource(new HttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT,STORE_PATH));
public static DataStreamSource<Map<String, String>> broadcastSource(Properties nacosProperties){
return streamExeEnv.addSource(new HttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT));
}
public static DataStreamSource<Map<String, byte[]>> singleBroadcastSource(Properties nacosProperties){
public static DataStreamSource<Map<String, String>>singleBroadcastSource(Properties nacosProperties){
return streamExeEnv.addSource(new SingleHttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT));
}
}

View File

@@ -2,11 +2,14 @@ package com.zdjizhi.source;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.io.file.FileReader;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.json.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.fasterxml.jackson.databind.JavaType;
import com.google.common.base.Joiner;
import com.jayway.jsonpath.JsonPath;
@@ -27,12 +30,19 @@ import java.util.*;
import java.util.concurrent.Executor;
public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
public class HttpSource extends RichHttpSourceFunction<Map<String, String>> {
private static final Logger logger = LoggerFactory.getLogger(HttpSource.class);
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
private static Map<String, String> knowledgeMetaCache = new HashMap<>();
private static HashMap<String, String> knowledgeUpdateCache;
private static final int TRY_TIMES = 3;
private static HttpClientUtils2 httpClientUtils;
//连接nacos的配置
private Properties nacosProperties;
@@ -45,48 +55,53 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
//nacos 连接超时时间
private long NACOS_READ_TIMEOUT;
//上传到hdfs的路径
private String STORE_PATH;
private ConfigService configService;
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
private static Map<String, String> updateMap = new HashMap<>();
private static HashMap<String, byte[]> knowledgeFileCache;
private static Header header;
//运行状态cancel时置为false
private boolean isRunning = true;
//是否下发,默认不发送
private boolean isSending = false;
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) {
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
this.nacosProperties = nacosProperties;
this.NACOS_DATA_ID = NACOS_DATA_ID;
this.NACOS_GROUP = NACOS_GROUP;
this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT;
this.STORE_PATH = storePath;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
httpClientUtils = new HttpClientUtils2();
//初始化元数据缓存
updateMap = new HashMap<>(16);
knowledgeMetaCache = new HashMap<>(16);
//初始化定位库缓存
knowledgeFileCache = new HashMap<>(16);
knowledgeUpdateCache = new HashMap<>(16);
header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
//连接nacos配置
try {
configService = NacosFactory.createConfigService(nacosProperties);
}catch (NacosException e){
logger.error("Get Schema config from Nacos error,The exception message is :{}", e.getMessage());
}
//初始化知识库
initKnowledge();
logger.info("连接nacos" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
configService = NacosFactory.createConfigService(nacosProperties);
}
@Override
public void run(SourceContext ctx) throws Exception {
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = formatter.format(new Date());
logger.info(format + "receive config from nacos:" + config);
System.out.println(format + "receive config from nacos:" + config);
if (StringUtil.isNotBlank(config)) {
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
loadKnowledge(metaList);
if (!knowledgeUpdateCache.isEmpty()){
ctx.collect(knowledgeUpdateCache);
knowledgeUpdateCache.clear();
}
// }
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@@ -101,78 +116,141 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
logger.info("receive update config:" + configMsg);
if (StringUtil.isNotBlank(configMsg)) {
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() >= 1) {
if (metaList.size() > 0) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
if (!sha256.equals(updateMap.get(fileName))) {
updateMap.put(fileName, sha256);
updateKnowledge(fileName, filePath);
if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
knowledgeMetaCache.put(fileName, sha256);
updateKnowledge(fileName, filePath,sha256);
}
}
ctx.collect(knowledgeFileCache);
if (!knowledgeUpdateCache.isEmpty()){
ctx.collect(knowledgeUpdateCache);
knowledgeUpdateCache.clear();
}
}
}
} catch (Exception e) {
logger.error("监听nacos配置失败", e);
}
System.out.println(configMsg);
}
});
while (isRunning) {
Thread.sleep(10000);
try {
Thread.sleep(10000);
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void loadKnowledge(ArrayList<Object> metaList) {
InputStream inputStream = null;
private void initKnowledge(){
String configMsg = "";
try {
if (metaList.size() >= 1) {
configMsg=configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
} catch (NacosException e) {
logger.error("从Nacos获取知识库元数据配置文件异常异常信息为:{}", e.getMessage());
}
if (StringUtil.isNotBlank(configMsg)){
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() > 0) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
updateMap.put(fileName, sha256);
knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
byte[] localFileByte = getLocalFile(fileName);
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)){
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
knowledgeMetaCache.put(fileName, sha256);
}else {
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
updateKnowledge(fileName,filePath,sha256);
}
}
}
}
}
private void updateKnowledge(String fileName, String filePath,String sha256) {
InputStream inputStream = null;
int retryNum = 0;
try {
while (retryNum < TRY_TIMES){
inputStream = httpClientUtils.httpGetInputStream(filePath, 90000, header);
if (inputStream !=null){
byte[] downloadBytes = IOUtils.toByteArray(inputStream);
String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
if (sha256.equals(downloadFileSha256Hex)&& downloadBytes.length > 0 ){
logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
boolean updateStatus = updateLocalFile(fileName, downloadBytes);
if (updateStatus){
knowledgeMetaCache.put(fileName,sha256);
knowledgeUpdateCache.put(fileName, sha256);
retryNum = TRY_TIMES;
}else {
retryNum++;
//避免频繁请求HOS
Thread.sleep(10000);
}
}else {
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
retryNum++;
//避免频繁请求HOS
Thread.sleep(10000);
}
}
}
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
}
}
private void updateKnowledge(String fileName, String filePath) {
InputStream inputStream = null;
private boolean updateLocalFile(String fileName,byte[] downloadBytes) {
FileOutputStream outputStream = null;
boolean updateStatus = false;
try {
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
byte[] bytes = IOUtils.toByteArray(inputStream);
HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, bytes);
knowledgeFileCache.put(fileName, bytes);
} catch (IOException ioException) {
ioException.printStackTrace();
HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, downloadBytes);
updateStatus=true;
} catch (IOException ioe) {
logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
ioe.printStackTrace();
} catch (RuntimeException e) {
logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage());
e.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
return updateStatus;
}
private static byte[] getLocalFile(String name) {
byte[] fileBytes = null;
try {
fileBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) ;
} catch (RuntimeException | IOException e) {
logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage());
e.printStackTrace();
}
return fileBytes;
}
@Override
public void cancel() {
this.isRunning = false;

View File

@@ -2,11 +2,14 @@ package com.zdjizhi.source;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.io.file.FileReader;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.json.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.fasterxml.jackson.databind.JavaType;
import com.google.common.base.Joiner;
import com.jayway.jsonpath.JsonPath;
@@ -23,13 +26,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executor;
public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
public class SingleHttpSource extends RichHttpSourceFunction<Map<String, String>> {
private static final Logger logger = LoggerFactory.getLogger(SingleHttpSource.class);
private static HashMap<String, byte[]> knowledgeFileCache;
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
private static Map<String, String> knowledgeMetaCache = new HashMap<>();
private static HashMap<String, String> knowledgeUpdateCache;
private static final int TRY_TIMES = 3;
private Properties nacosProperties;
@@ -39,20 +51,16 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
private long NACOS_READ_TIMEOUT;
private static String STORE_PATH;
private static HttpClientUtils2 httpClientUtils ;
private ConfigService configService;
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
private static Map<String, String> updateMap = new HashMap<>();
private static Header header;
private boolean isRunning = true;
public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
this.nacosProperties = nacosProperties;
this.NACOS_DATA_ID = NACOS_DATA_ID;
@@ -65,33 +73,32 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
logger.info("连接nacos" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
configService = NacosFactory.createConfigService(nacosProperties);
httpClientUtils = new HttpClientUtils2();
//初始化元数据缓存
updateMap = new HashMap<>(16);
knowledgeMetaCache = new HashMap<>(16);
//初始化定位库缓存
knowledgeFileCache = new HashMap<>(16);
knowledgeUpdateCache = new HashMap<>(16);
header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
//连接nacos配置
try {
configService = NacosFactory.createConfigService(nacosProperties);
}catch (NacosException e){
logger.error("Get Schema config from Nacos error,The exception message is :{}", e.getMessage());
}
//初始化知识库
initKnowledge();
logger.info("连接nacos" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
}
@Override
public void run(SourceContext ctx) throws Exception {
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
// List<CustomFile> customFiles = new ArrayList<>();
if (StringUtil.isNotBlank(config)) {
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
loadKnowledge(metaList);
if (!knowledgeUpdateCache.isEmpty()){
ctx.collect(knowledgeUpdateCache);
knowledgeUpdateCache.clear();
}
// if (StringUtil.isNotBlank(config)) {
// List<KnowledgeLog> knowledgeLogListList = jsonMapperInstance.fromJson(config, listType);
// if (knowledgeLogListList.size()>=1){
// for (KnowledgeLog knowledgeLog : knowledgeLogListList) {
// String name = knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat());
// String sha256 = knowledgeLog.getSha256();
// updateMap.put(name,sha256);
// }
// }
// }
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@Override
@@ -105,20 +112,22 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
logger.info("receive update config:" + configMsg);
if (StringUtil.isNotBlank(configMsg)) {
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() >= 1) {
if (metaList.size() > 0) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
if (!sha256.equals(updateMap.get(fileName))) {
updateMap.put(fileName, sha256);
updateKnowledge(fileName, filePath);
if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
knowledgeMetaCache.put(fileName, sha256);
updateKnowledge(fileName, filePath,sha256);
}
}
ctx.collect(knowledgeFileCache);
if (!knowledgeUpdateCache.isEmpty()){
ctx.collect(knowledgeUpdateCache);
knowledgeUpdateCache.clear();
}
}
}
@@ -130,85 +139,129 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
});
while (isRunning) {
Thread.sleep(10000);
try {
Thread.sleep(10000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
// private CustomFile loadKnowledge(String fileName, String filePath) {
// InputStream inputStream = null;
// FileOutputStream outputStream = null;
// CustomFile customFile = new CustomFile();
// try {
// customFile.setFileName(fileName);
// Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
// HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
// inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
// FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
// File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
// outputStream = new FileOutputStream(file);
// byte[] bytes = IOUtils.toByteArray(inputStream);
// customFile.setContent(bytes);
// inputStream = new ByteArrayInputStream(customFile.getContent());
// IoUtil.copy(inputStream, outputStream);
//
// } catch (IOException ioException) {
// ioException.printStackTrace();
// } finally {
// IOUtils.closeQuietly(inputStream);
// IOUtils.closeQuietly(outputStream);
// }
// return customFile;
// }
private void loadKnowledge(ArrayList<Object> metaList) {
InputStream inputStream = null;
try {
if (metaList.size() >= 1) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
updateMap.put(fileName, sha256);
knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
private void initKnowledge(){
String configMsg = "";
try {
configMsg=configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
} catch (NacosException e) {
logger.error("从Nacos获取知识库元数据配置文件异常异常信息为:{}", e.getMessage());
}
if (StringUtil.isNotBlank(configMsg)){
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() > 0) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
byte[] localFileByte = getLocalFile(fileName);
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)){
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
knowledgeMetaCache.put(fileName, sha256);
}else {
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
updateKnowledge(fileName,filePath,sha256);
}
}
}
}
} catch (IOException ioException) {
ioException.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
}
}
private void updateKnowledge(String fileName, String filePath) {
private void updateKnowledge(String fileName, String filePath,String sha256) {
InputStream inputStream = null;
FileOutputStream outputStream = null;
int retryNum = 0;
try {
while (retryNum < TRY_TIMES){
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
if (inputStream !=null){
byte[] downloadBytes = IOUtils.toByteArray(inputStream);
String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
if (sha256.equals(downloadFileSha256Hex)&& downloadBytes.length > 0 ){
logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
boolean updateStatus = updateLocalFile(fileName, downloadBytes);
if (updateStatus){
knowledgeMetaCache.put(fileName,sha256);
knowledgeUpdateCache.put(fileName, sha256);
retryNum = TRY_TIMES;
}else {
retryNum++;
//避免频繁请求HOS
Thread.sleep(10000);
}
// isSending = true;
}else {
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
retryNum++;
//避免频繁请求HOS
Thread.sleep(10000);
}
}
}
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
}
}
private boolean updateLocalFile(String fileName,byte[] downloadBytes) {
FileOutputStream outputStream = null;
boolean updateStatus = false;
try {
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
outputStream = new FileOutputStream(file);
byte[] bytes = IOUtils.toByteArray(inputStream);
knowledgeFileCache.put(fileName, bytes);
inputStream=new ByteArrayInputStream(bytes);
IoUtil.copy(inputStream, outputStream);
} catch (IOException ioException) {
ioException.printStackTrace();
IoUtil.copy(new ByteArrayInputStream(downloadBytes), outputStream);
updateStatus=true;
} catch (IOException ioe) {
logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
ioe.printStackTrace();
} catch (RuntimeException e) {
logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage());
e.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
return updateStatus;
}
private static byte[] getLocalFile(String name) {
byte[] fileBytes = null;
try {
fileBytes=new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes();
} catch (RuntimeException e) {
logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage());
e.printStackTrace();
}
return fileBytes;
}
@Override
public void cancel() {
this.isRunning = false;

View File

@@ -1,11 +1,14 @@
package com.zdjizhi.utils;
import cn.hutool.core.io.file.FileReader;
import cn.hutool.crypto.digest.DigestUtil;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.CustomFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
@@ -86,20 +89,126 @@ public class IpUtils {
}
}
public static void updateIpLook(Map<String, byte[]> knowledgeFileCache){
public static void updateIpLook(Map<String, String> knowledgeFileCache){
try{
IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
ipLookup= builder.loadDataFileV4(new ByteArrayInputStream(knowledgeFileCache.get("ip_v4_built_in.mmdb")))
.loadDataFileV6(new ByteArrayInputStream(knowledgeFileCache.get("ip_v6_built_in.mmdb")))
.loadDataFilePrivateV4(new ByteArrayInputStream(knowledgeFileCache.get("ip_v4_user_defined.mmdb")))
.loadDataFilePrivateV6(new ByteArrayInputStream(knowledgeFileCache.get("ip_v6_user_defined.mmdb")))
.build();
if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)) {
byte[] ipv4BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_built_in.mmdb");
if (ipv4BuiltBytes!=null){
if (knowledgeFileCache.containsKey("ip_v4_built_in.mmdb")){
String sha256 = knowledgeFileCache.get("ip_v4_built_in.mmdb");
byte[] localFileByte = getLocalFile("ip_v4_built_in.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)){
builder.loadDataFileV4(new ByteArrayInputStream(ipv4BuiltBytes));
}
}
}
byte[] ipv6BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_built_in.mmdb");
if (ipv6BuiltBytes!=null){
if (knowledgeFileCache.containsKey("ip_v6_built_in.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v6_built_in.mmdb");
byte[] localFileByte = getLocalFile("ip_v6_built_in.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFileV6(new ByteArrayInputStream(ipv6BuiltBytes));
}
}
}
byte[] ipv4UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_user_defined.mmdb");
if (ipv4UserBytes!=null){
if (knowledgeFileCache.containsKey("ip_v4_user_defined.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v4_user_defined.mmdb");
byte[] localFileByte = getLocalFile("ip_v4_user_defined.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFilePrivateV4(new ByteArrayInputStream(ipv4UserBytes));
}
}
}
byte[] ipv6UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_user_defined.mmdb");
if (ipv6UserBytes!=null){
if (knowledgeFileCache.containsKey("ip_v6_user_defined.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v6_user_defined.mmdb");
byte[] localFileByte = getLocalFile("ip_v6_user_defined.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFilePrivateV6(new ByteArrayInputStream(ipv6UserBytes));
}
}
}
}else if ("SINGLE".equals(CommonConfig.CLUSTER_OR_SINGLE)){
byte[] ipv4BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_built_in.mmdb");
if (ipv4BuiltBytes!=null){
if (knowledgeFileCache.containsKey("ip_v4_built_in.mmdb")){
String sha256 = knowledgeFileCache.get("ip_v4_built_in.mmdb");
byte[] localFileByte = getLocalFile("ip_v4_built_in.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)){
builder.loadDataFileV4(new ByteArrayInputStream(ipv4BuiltBytes));
}
}
}
byte[] ipv6BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_built_in.mmdb");
if (ipv6BuiltBytes!=null){
if (knowledgeFileCache.containsKey("ip_v6_built_in.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v6_built_in.mmdb");
byte[] localFileByte = getLocalFile("ip_v6_built_in.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFileV6(new ByteArrayInputStream(ipv6BuiltBytes));
}
}
}
byte[] ipv4UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_user_defined.mmdb");
if (ipv4UserBytes!=null){
if (knowledgeFileCache.containsKey("ip_v4_user_defined.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v4_user_defined.mmdb");
byte[] localFileByte = getLocalFile("ip_v4_user_defined.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFilePrivateV4(new ByteArrayInputStream(ipv4UserBytes));
}
}
}
byte[] ipv6UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_user_defined.mmdb");
if (ipv6UserBytes!=null){
if (knowledgeFileCache.containsKey("ip_v6_user_defined.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v6_user_defined.mmdb");
byte[] localFileByte = getLocalFile("ip_v6_user_defined.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFilePrivateV6(new ByteArrayInputStream(ipv6UserBytes));
}
}
}
}
ipLookup = builder.build();
}catch (Exception e){
LOG.error("加载失败",e);
}
}
private static byte[] getLocalFile(String name) {
byte[] fileBytes = null;
try {
fileBytes = "CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE) ?
HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) :
new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes();
} catch (RuntimeException | IOException e) {
e.printStackTrace();
}
return fileBytes;
}
public static void main(String[] args) {
System.out.println(ipLookup.countryLookup("49.7.115.37"));

View File

@@ -0,0 +1,8 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277141, sketch_duration=59, attack_type='DNS Flood', source_ip='23.91.128.115', destination_ip='102.219.30.33', sketch_sessions=945, sketch_packets=945, sketch_bytes=446370, vsys_id=23}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277205, sketch_duration=86, attack_type='DNS Flood', source_ip='172.217.160.68', destination_ip='10.113.83.88', sketch_sessions=730, sketch_packets=730, sketch_bytes=344575, vsys_id=1}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277244, sketch_duration=47, attack_type='DNS Flood', source_ip='45.135.144.112', destination_ip='42.62.192.132', sketch_sessions=0, sketch_packets=0, sketch_bytes=47, vsys_id=1}
--DosDetectionThreshold
{profileId='6091', attackType='DNS Flood', serverIpList=[113.113.83.213, 42.62.192.132/28, 10.113.83.1/25, 102.219.30.33/29], serverIpAddr='null', packetsPerSec=1, bitsPerSec=1, sessionsPerSec=1, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}
{profileId='5679', attackType='DNS Flood', serverIpList=[102.219.30.33], serverIpAddr='null', packetsPerSec=500, bitsPerSec=1000000, sessionsPerSec=100000, isValid=1, vsysId=23, superiorIds=[4, 5]}

View File

@@ -0,0 +1,6 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277232, sketch_duration=59, attack_type='ICMP Flood', source_ip='45.170.244.25', destination_ip='24.152.57.56', sketch_sessions=499, sketch_packets=499, sketch_bytes=111970, vsys_id=1}
--DosDetectionThreshold
{profileId='6093', attackType='ICMP Flood', serverIpList=[31.131.80.88/29, 24.152.57.56/29, 47.93.59.1/25], serverIpAddr='null', packetsPerSec=210, bitsPerSec=0, sessionsPerSec=0, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}

View File

@@ -0,0 +1,7 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1685003938, sketch_duration=63714, attack_type='TCP SYN Flood', source_ip='5.32.144.55', destination_ip='45.188.134.11', sketch_sessions=0, sketch_packets=0, sketch_bytes=4195, vsys_id=1}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277234, sketch_duration=57, attack_type='TCP SYN Flood', source_ip='18.65.148.128', destination_ip='23.200.74.224', sketch_sessions=54, sketch_packets=54, sketch_bytes=73427, vsys_id=1}
--DosDetectionThreshold
{profileId='6095', attackType='TCP SYN Flood', serverIpList=[23.200.74.224, 45.188.134.11/29, 41.183.0.15/29, 41.183.0.16/30], serverIpAddr='null', packetsPerSec=1, bitsPerSec=1, sessionsPerSec=1, isValid=1, vsysId=1, superiorIds=[5, 4, 12, 27]}

View File

@@ -0,0 +1,8 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277291, sketch_duration=0, attack_type='UDP Flood', source_ip='121.14.89.209', destination_ip='192.168.50.11', sketch_sessions=0, sketch_packets=0, sketch_bytes=0, vsys_id=1}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277233, sketch_duration=58, attack_type='UDP Flood', source_ip='192.168.50.56,192.168.50.34,192.168.50.11,192.168.50.33,192.168.50.55,192.168.50.58,192.168.50.36,192.168.50.14,192.168.50.35,192.168.50.13,192.168.50.57,192.168.50.30,192.168.50.51,192.168.50.54,192.168.50.10,192.168.50.32,192.168.50.53,192.168.50.31,192.168.50.16,192.168.50.38,192.168.50.15,192.168.50.37,192.168.50.18,192.168.50.17,192.168.50.50,192.168.50.45,192.168.50.23,192.168.50.22,192.168.50.44,192.168.50.25,192.168.50.47,192.168.50.46,192.168.50.24,192.168.50.63,192.168.50.41,192.168.50.40,192.168.50.62,192.168.50.43,192.168.50.21,192.168.50.20,192.168.50.42,192.168.50.27,192.168.50.26,192.168.50.48,192.168.50.28,192.168.50.61,192.168.50.60', destination_ip='121.14.89.209', sketch_sessions=297, sketch_packets=297, sketch_bytes=371404, vsys_id=1}
--DosDetectionThreshold
{profileId='5333', attackType='UDP Flood', serverIpList=[192.168.50.11, 192.168.50.12], serverIpAddr='null', packetsPerSec=50, bitsPerSec=50, sessionsPerSec=50, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}

View File

@@ -0,0 +1,237 @@
package com.zdjizhi.etl;
import com.zdjizhi.common.DosDetectionThreshold;
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.utils.IpUtils;
import com.zdjizhi.utils.NacosUtils;
import com.zdjizhi.utils.SnowflakeId;
import com.zdjizhi.utils.StringUtil;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import java.math.BigDecimal;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashSet;
public class DosDetectionTest {
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
private final static int BASELINE_SIZE = 144;
private final static int STATIC_CONDITION_TYPE = 1;
private final static int BASELINE_CONDITION_TYPE = 2;
private final static int SENSITIVITY_CONDITION_TYPE = 3;
private final static String SESSIONS_TAG = "sessions";
private final static String PACKETS_TAG = "packets";
private final static String BITS_TAG = "bits";
@Test
public void dosDetectionTest(){
DosDetectionThreshold dosDetectionThreshold = new DosDetectionThreshold();
ArrayList<String> serverIpList = new ArrayList<>();
serverIpList.add("192.168.50.11");
serverIpList.add("192.168.50.1/24");
serverIpList.add("FC::12:0:0/54");
serverIpList.add("FC::12:0:0");
dosDetectionThreshold.setProfileId(4437);
dosDetectionThreshold.setAttackType("DNS Flood");
dosDetectionThreshold.setServerIpList(serverIpList);
dosDetectionThreshold.setSessionsPerSec(1);
dosDetectionThreshold.setPacketsPerSec(1);
dosDetectionThreshold.setBitsPerSec(100000);
dosDetectionThreshold.setIsValid(1);
dosDetectionThreshold.setSuperiorIds(new Integer[]{5,4,12,27});
DosSketchLog dosSketchLog = new DosSketchLog ();
dosSketchLog.setSketch_sessions(68);
dosSketchLog.setSketch_packets(68);
dosSketchLog.setSketch_bytes(285820);//185.82
dosSketchLog.setVsys_id(1);
dosSketchLog.setAttack_type("ICMP Flood");
dosSketchLog.setSource_ip("45.170.244.25");
dosSketchLog.setDestination_ip("24.152.57.56");
//静态阈值获取
long sessionBase = dosDetectionThreshold.getSessionsPerSec();
long pktBase=dosDetectionThreshold.getPacketsPerSec();
long bitBase=dosDetectionThreshold.getBitsPerSec();
//基于速率进行计算
long diffSession = dosSketchLog.getSketch_sessions() - sessionBase;
long diffPkt = dosSketchLog.getSketch_packets() - pktBase;
long diffByte = dosSketchLog.getSketch_bytes() - bitBase;
Double diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100;
Double diffPktPercent = getDiffPercent(diffPkt, pktBase)*100;
Double diffBitPercent = getDiffPercent(diffByte, bitBase)*100;
long profileId = 0;
DosEventLog result =null;
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){
profileId = dosDetectionThreshold.getProfileId();
result= getDosEventLog(dosSketchLog, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
System.out.println(result);
}else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){
profileId = dosDetectionThreshold.getProfileId();
result = getDosEventLog(dosSketchLog, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
System.out.println(result);
}else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){
profileId = dosDetectionThreshold.getProfileId();
result = getDosEventLog(dosSketchLog, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
System.out.println(result);
}
}
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
Severity severity = judgeSeverity(percent);
Integer staticSensitivityThreshold = 100;
if (severity != Severity.NORMAL) {
if (type == BASELINE_CONDITION_TYPE && percent < 0.2) {
// logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
}else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold){
// logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}",destinationIp, attackType, base, percent, value);
}else {
result = getResult(value, base, profileId, severity, percent+1, type, tag);
if (type == SENSITIVITY_CONDITION_TYPE){
result.setSeverity(Severity.MAJOR.severity);
}
// logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
}
}
// else {
// logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
// }
}
return result;
}
private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) {
DosEventLog dosEventLog = new DosEventLog();
// dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setVsys_id(value.getVsys_id());
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
dosEventLog.setProfile_id(profileId);
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
// dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog));
dosEventLog.setDestination_ip(value.getDestination_ip());
// dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
dosEventLog.setSource_ip_list(ipList);
dosEventLog.setSource_country_list(getSourceCountryList(ipList));
dosEventLog.setSession_rate(value.getSketch_sessions());
dosEventLog.setPacket_rate(value.getSketch_packets());
dosEventLog.setBit_rate(value.getSketch_bytes());
return dosEventLog;
}
public String getConditions(double percent, long base, long sessions, int type, String tag,DosEventLog dosEventLog) {
int condition =0;
if ("Minor".equals(dosEventLog.getSeverity())){
condition=50;
}else if ("Warning".equals(dosEventLog.getSeverity())){
condition=100;
}else if ("Major".equals(dosEventLog.getSeverity())){
condition=250;
}else if ("Severe".equals(dosEventLog.getSeverity())){
condition=500;
}else if ("Critical".equals(dosEventLog.getSeverity())){
condition =800;
}
switch (type) {
case STATIC_CONDITION_TYPE:
return "Rate > " +
base + " " +
tag + "/s" + "(>"+condition+"%)";
case BASELINE_CONDITION_TYPE:
return tag + " > " +
PERCENT_INSTANCE.format(percent) + " of baseline";
case SENSITIVITY_CONDITION_TYPE:
return String.valueOf(sessions) + " " +
tag + "/s Unusually high " +
StringUtils.capitalize(tag);
default:
throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]");
}
}
private String getSourceCountryList(String sourceIpList) {
if (StringUtil.isNotBlank(sourceIpList)) {
String countryList;
try {
String[] ipArr = sourceIpList.split(",");
HashSet<String> countrySet = new HashSet<>();
for (String ip : ipArr) {
String country = IpUtils.ipLookup.countryLookup(ip);
if (StringUtil.isNotBlank(country)){
countrySet.add(country);
}
}
countryList = StringUtils.join(countrySet, ", ");
return countryList;
} catch (Exception e) {
// logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
return StringUtil.EMPTY;
}
} else {
throw new IllegalArgumentException("Illegal Argument sourceIpList = null");
}
}
private Double getDiffPercent(long diff, long base) {
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
}
private Severity judgeSeverity(double diffPercent) {
if (diffPercent >= 0.5 && diffPercent < 1) {
return Severity.MINOR;
} else if (diffPercent >= 1 && diffPercent < 2.5) {
return Severity.WARNING;
} else if (diffPercent >= 2.5 && diffPercent < 5) {
return Severity.MAJOR;
} else if (diffPercent >= 5 && diffPercent < 8) {
return Severity.SEVERE;
} else if (diffPercent >= 8) {
return Severity.CRITICAL;
} else {
return Severity.NORMAL;
}
}
private enum Severity {
/**
* 判断严重程度枚举类型
*/
CRITICAL("Critical"),
SEVERE("Severe"),
MAJOR("Major"),
WARNING("Warning"),
MINOR("Minor"),
NORMAL("Normal");
private final String severity;
@Override
public String toString() {
return this.severity;
}
Severity(String severity) {
this.severity = severity;
}
}
}

View File

@@ -0,0 +1,11 @@
package com.zdjizhi.etl;
import org.junit.Test;
public class EtlProcessFunctionTest {
@Test
public void EtlProcessFunctionTest(){
}
}