TSG-20112,TSG-20099适配功能端日志调整,重构代码
This commit is contained in:
@@ -3,18 +3,12 @@ package com.zdjizhi.function;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.geedgenetworks.utils.DateUtils;
|
||||
import com.geedgenetworks.utils.StringUtil;
|
||||
import com.zdjizhi.common.*;
|
||||
import com.zdjizhi.utils.Snowflakeld.SnowflakeId;
|
||||
import com.zdjizhi.utils.Threshold.ParseBaselineThreshold;
|
||||
import com.zdjizhi.utils.Threshold.ParseStaticThreshold;
|
||||
import com.zdjizhi.utils.connections.http.HttpClientService;
|
||||
import com.zdjizhi.utils.knowledgebase.IpLookupUtils;
|
||||
import inet.ipaddr.IPAddress;
|
||||
import inet.ipaddr.IPAddressString;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
|
||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
@@ -31,7 +25,7 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
||||
private static final Log logger = LogFactory.get();
|
||||
private Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
|
||||
private final NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
|
||||
private HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap;
|
||||
// private HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap;
|
||||
private final int BASELINE_SIZE = 144;
|
||||
private final int STATIC_CONDITION_TYPE = 1;
|
||||
private final int BASELINE_CONDITION_TYPE = 2;
|
||||
@@ -42,51 +36,17 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
||||
private final int OTHER_BASELINE_TYPE = 3;
|
||||
private SnowflakeId snowflakeId;
|
||||
private Configuration configuration;
|
||||
private HttpClientService httpClientService;
|
||||
private IpLookupUtils ipLookupUtils;
|
||||
//private IpLookupUtils ipLookupUtils;
|
||||
private ParseBaselineThreshold parseBaselineThresholdld;
|
||||
private ParseStaticThreshold parseStaticThreshold;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) {
|
||||
|
||||
configuration = (Configuration) getRuntimeContext()
|
||||
.getExecutionConfig().getGlobalJobParameters();
|
||||
httpClientService = new HttpClientService(configuration);
|
||||
|
||||
snowflakeId = new SnowflakeId(configuration.get(DATA_CENTER_ID_NUM), getRuntimeContext().getIndexOfThisSubtask());
|
||||
|
||||
try {
|
||||
ipLookupUtils = new IpLookupUtils(configuration, httpClientService);
|
||||
ipLookupUtils.stuffKnowledgeMetaCache();
|
||||
Timer timer = new Timer();
|
||||
timer.schedule(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
ipLookupUtils.stuffKnowledgeMetaCache();
|
||||
logger.info("定位库定时调度成功");
|
||||
}
|
||||
}, configuration.get(KNOWLEDGE_BASE_SCHEDULE_MINUTES) * 60 * 1000, configuration.get(KNOWLEDGE_BASE_SCHEDULE_MINUTES) * 60 * 1000);
|
||||
} catch (Exception e) {
|
||||
logger.error("定位库加载失败,具体原因为" + e);
|
||||
}
|
||||
|
||||
try {
|
||||
parseStaticThreshold = new ParseStaticThreshold(configuration, httpClientService);
|
||||
thresholdRangeMap = parseStaticThreshold.createStaticThreshold();
|
||||
Timer timer = new Timer();
|
||||
timer.schedule(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
thresholdRangeMap = parseStaticThreshold.createStaticThreshold();
|
||||
logger.info("基于静态阈值构建threshold RangeMap成功,Threshold RangeMap:" + thresholdRangeMap.toString());
|
||||
}
|
||||
}, configuration.get(STATIC_THRESHOLD_SCHEDULE_MINUTES) * 60 * 1000, configuration.get(STATIC_THRESHOLD_SCHEDULE_MINUTES) * 60 * 1000);
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("基于静态阈值构建threshold RangeMap失败,失败原因为:" + e);
|
||||
}
|
||||
|
||||
try {
|
||||
parseBaselineThresholdld = new ParseBaselineThreshold(configuration);
|
||||
baselineMap = parseBaselineThresholdld.readFromHbase();
|
||||
@@ -95,7 +55,7 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
||||
@Override
|
||||
public void run() {
|
||||
baselineMap = parseBaselineThresholdld.readFromHbase();
|
||||
logger.info("从Hbase获取baselineMap成功,baselineMap:" + thresholdRangeMap.toString());
|
||||
logger.info("从Hbase获取baselineMap成功,baselineMap:" + baselineMap.toString());
|
||||
}
|
||||
}, configuration.get(BASELINE_THRESHOLD_SCHEDULE_DAYS) * 24 * 60 * 60 * 1000, configuration.get(BASELINE_THRESHOLD_SCHEDULE_DAYS) * 24 * 60 * 60 * 1000);
|
||||
|
||||
@@ -110,31 +70,29 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
||||
public void processElement(DosSketchLog value, Context ctx, Collector<DosEventLog> out) throws Exception {
|
||||
DosEventLog finalResult = null;
|
||||
try {
|
||||
String destinationIp = value.getDestination_ip();
|
||||
int vsysId = value.getVsys_id();
|
||||
String key = destinationIp + "-" + vsysId;
|
||||
String attackType = value.getAttack_type();
|
||||
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
|
||||
|
||||
DosDetectionThreshold threshold = null;
|
||||
if (thresholdRangeMap.containsKey(vsysId)) {
|
||||
threshold = thresholdRangeMap.get(vsysId).getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
|
||||
if (value.getRule_id() == 0) {
|
||||
String destinationIp = value.getServer_ip();
|
||||
int vsysId = value.getVsys_id();
|
||||
String key = destinationIp + "-" + vsysId;
|
||||
String attackType = value.getAttack_type();
|
||||
DosDetectionThreshold threshold = null;
|
||||
logger.debug("当前判断IP:{}, 类型: {}", key, attackType);
|
||||
if (threshold == null && baselineMap.containsKey(key)) {
|
||||
finalResult = getDosEventLogByBaseline(value, key);
|
||||
} else if (threshold == null && !baselineMap.containsKey(key)) {
|
||||
finalResult = getDosEventLogBySensitivityThreshold(value);
|
||||
}
|
||||
else {
|
||||
logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", key, attackType);
|
||||
}
|
||||
}
|
||||
else{
|
||||
finalResult = getResult(value,0,0,Severity.MAJOR,0.0,0,"DoS Protection [12]");
|
||||
}
|
||||
} catch(Exception e){
|
||||
logger.error("判定失败\n {} \n{}", value, e);
|
||||
}
|
||||
|
||||
logger.debug("当前判断IP:{}, 类型: {}", key, attackType);
|
||||
if (threshold == null && baselineMap.containsKey(key)) {
|
||||
finalResult = getDosEventLogByBaseline(value, key);
|
||||
} else if (threshold == null && !baselineMap.containsKey(key)) {
|
||||
finalResult = getDosEventLogBySensitivityThreshold(value);
|
||||
} else if (threshold != null) {
|
||||
finalResult = getDosEventLogByStaticThreshold(value, threshold);
|
||||
} else {
|
||||
logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", key, attackType);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("判定失败\n {} \n{}", value, e);
|
||||
}
|
||||
if (finalResult != null) {
|
||||
out.collect(finalResult);
|
||||
}
|
||||
@@ -142,7 +100,7 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
||||
|
||||
|
||||
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
|
||||
long sketchSessions = value.getSketch_sessions();
|
||||
long sketchSessions = value.getSessions();
|
||||
Integer staticSensitivityThreshold = configuration.get(STATIC_SENSITIVITY_THRESHOLD);
|
||||
long diff = sketchSessions - staticSensitivityThreshold;
|
||||
return getDosEventLog(value, staticSensitivityThreshold, diff, 0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
|
||||
@@ -150,55 +108,16 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
||||
|
||||
private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String key) {
|
||||
String attackType = value.getAttack_type();
|
||||
long sketchSessions = value.getSketch_sessions();
|
||||
long sketchSessions = value.getSessions();
|
||||
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType);
|
||||
Integer base = getBaseValue(dosBaselineThreshold, value);
|
||||
long diff = sketchSessions - base;
|
||||
return getDosEventLog(value, base, diff, 0, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
|
||||
}
|
||||
|
||||
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) throws CloneNotSupportedException {
|
||||
long sessionBase = threshold.getSessions_per_sec();
|
||||
long pktBase = threshold.getPackets_per_sec();
|
||||
long bitBase = threshold.getBits_per_sec();
|
||||
|
||||
long diffSession = value.getSketch_sessions() - sessionBase;
|
||||
long diffPkt = value.getSketch_packets() - pktBase;
|
||||
long diffByte = value.getSketch_bytes() - bitBase;
|
||||
|
||||
double diffSessionPercent = 0.0;
|
||||
double diffPktPercent = 0.0;
|
||||
double diffBitPercent = 0.0;
|
||||
|
||||
if (sessionBase > 0) {
|
||||
diffSessionPercent = getDiffPercent(diffSession, sessionBase) * 100;
|
||||
}
|
||||
if (pktBase > 0) {
|
||||
diffPktPercent = getDiffPercent(diffPkt, pktBase) * 100;
|
||||
}
|
||||
if (bitBase > 0) {
|
||||
diffBitPercent = getDiffPercent(diffByte, bitBase) * 100;
|
||||
}
|
||||
|
||||
long profileId = 0;
|
||||
DosEventLog result = null;
|
||||
|
||||
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent) {
|
||||
profileId = threshold.getId();
|
||||
result = getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
|
||||
} else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent) {
|
||||
profileId = threshold.getId();
|
||||
result = getDosEventLog(value, pktBase, diffPkt, profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
|
||||
} else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent) {
|
||||
profileId = threshold.getId();
|
||||
result = getDosEventLog(value, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {
|
||||
DosEventLog result = null;
|
||||
String destinationIp = value.getDestination_ip();
|
||||
String destinationIp = value.getServer_ip();
|
||||
String attackType = value.getAttack_type();
|
||||
if (diff > 0 && base != 0) {
|
||||
double percent = getDiffPercent(diff, base);
|
||||
@@ -207,7 +126,7 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
||||
if (severity != Severity.NORMAL) {
|
||||
if (type == BASELINE_CONDITION_TYPE && percent < configuration.get(BASELINE_SENSITIVITY_THRESHOLD)) {
|
||||
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
|
||||
} else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold) {
|
||||
} else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSessions() < staticSensitivityThreshold) {
|
||||
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
|
||||
} else {
|
||||
result = getResult(value, base, profileId, severity, percent, type, tag);
|
||||
@@ -225,23 +144,46 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
||||
|
||||
private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) {
|
||||
DosEventLog dosEventLog = new DosEventLog();
|
||||
dosEventLog.setRecv_time(value.getCommon_recv_time());
|
||||
dosEventLog.setRecv_time(value.getRecv_time());
|
||||
dosEventLog.setLog_id(snowflakeId.nextId());
|
||||
dosEventLog.setVsys_id(value.getVsys_id());
|
||||
dosEventLog.setStart_time(value.getSketch_start_time());
|
||||
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
|
||||
dosEventLog.setStart_time(value.getStart_timestamp_ms()/1000);
|
||||
dosEventLog.setEnd_time(value.getEnd_timestamp_ms()/1000);
|
||||
dosEventLog.setProfile_id(profileId);
|
||||
dosEventLog.setRule_id(value.getRule_id());
|
||||
dosEventLog.setAttack_type(value.getAttack_type());
|
||||
dosEventLog.setSeverity(severity.severity);
|
||||
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag, dosEventLog));
|
||||
dosEventLog.setDestination_ip(value.getDestination_ip());
|
||||
dosEventLog.setDestination_country(ipLookupUtils.getCountryLookup(value.getDestination_ip()));
|
||||
String ipList = value.getSource_ip();
|
||||
dosEventLog.setSource_ip_list(ipList);
|
||||
dosEventLog.setSource_country_list(getSourceCountryList(ipList));
|
||||
dosEventLog.setSession_rate(value.getSketch_sessions());
|
||||
dosEventLog.setPacket_rate(value.getSketch_packets());
|
||||
dosEventLog.setBit_rate(value.getSketch_bytes());
|
||||
if(base != 0) {
|
||||
dosEventLog.setSeverity(severity.severity);
|
||||
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSessions(), type, tag, dosEventLog));
|
||||
}
|
||||
else{
|
||||
dosEventLog.setSeverity(severity.severity);
|
||||
dosEventLog.setConditions(tag);
|
||||
}
|
||||
dosEventLog.setDestination_ip(value.getServer_ip());
|
||||
dosEventLog.setDestination_country(value.getServer_country());
|
||||
StringBuilder client_ips = new StringBuilder();
|
||||
StringBuilder client_countrys = new StringBuilder();
|
||||
Iterator<Map.Entry<String, String>> iterator = value.getClientips_countrys().entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<String, String> entry = iterator.next();
|
||||
client_ips.append(entry.getKey());
|
||||
client_countrys.append(entry.getValue());
|
||||
if (iterator.hasNext()) {
|
||||
client_ips.append(",");
|
||||
client_countrys.append(",");
|
||||
}
|
||||
}
|
||||
if(client_ips.length()>0){
|
||||
dosEventLog.setSource_ip_list(client_ips.toString());
|
||||
dosEventLog.setSource_country_list(client_countrys.toString());
|
||||
}
|
||||
dosEventLog.setSession_rate(value.getSession_rate());
|
||||
dosEventLog.setPacket_rate(value.getPacket_rate());
|
||||
dosEventLog.setBit_rate(value.getBit_rate());
|
||||
dosEventLog.setBytes(value.getBytes());
|
||||
dosEventLog.setSessions(value.getSessions());
|
||||
dosEventLog.setPackets(value.getPkts());
|
||||
return dosEventLog;
|
||||
}
|
||||
|
||||
@@ -253,10 +195,10 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
||||
Integer defaultVaule = dosBaselineThreshold.getSession_rate_default_value();
|
||||
Integer sessionRateBaselineType = dosBaselineThreshold.getSession_rate_baseline_type();
|
||||
if (baselines != null && baselines.size() == BASELINE_SIZE) {
|
||||
int timeIndex = getCurrentTimeIndex(value.getSketch_start_time());
|
||||
int timeIndex = getCurrentTimeIndex(value.getStart_timestamp_ms());
|
||||
base = baselines.get(timeIndex);
|
||||
if (base == 0) {
|
||||
logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getDestination_ip(), value.getAttack_type(), defaultVaule);
|
||||
logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getServer_ip(), value.getAttack_type(), defaultVaule);
|
||||
base = defaultVaule;
|
||||
}
|
||||
if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < configuration.get(STATIC_SENSITIVITY_THRESHOLD)) {
|
||||
@@ -300,29 +242,6 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
||||
}
|
||||
}
|
||||
|
||||
private String getSourceCountryList(String sourceIpList) {
|
||||
if (StringUtil.isNotBlank(sourceIpList)) {
|
||||
String countryList;
|
||||
try {
|
||||
String[] ipArr = sourceIpList.split(",");
|
||||
HashSet<String> countrySet = new HashSet<>();
|
||||
for (String ip : ipArr) {
|
||||
String country = ipLookupUtils.getCountryLookup(ip);
|
||||
if (StringUtil.isNotBlank(country)) {
|
||||
countrySet.add(country);
|
||||
}
|
||||
}
|
||||
countryList = StringUtils.join(countrySet, ", ");
|
||||
return countryList;
|
||||
} catch (Exception e) {
|
||||
logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
|
||||
return StringUtil.EMPTY;
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("Illegal Argument sourceIpList = null");
|
||||
}
|
||||
}
|
||||
|
||||
private int getCurrentTimeIndex(long sketchStartTime) {
|
||||
int index = 0;
|
||||
try {
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
package com.zdjizhi.function;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.geedgenetworks.utils.StringUtil;
|
||||
import com.zdjizhi.common.DosMetricsLog;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import com.zdjizhi.common.pojo.DosSketchMetricsLog;
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
import org.apache.flink.api.common.functions.RichFlatMapFunction;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.zdjizhi.conf.DosConfigs.DESTINATION_IP_PARTITION_NUM;
|
||||
import static com.zdjizhi.conf.DosConfigs.FLINK_WINDOW_MAX_TIME;
|
||||
|
||||
public class DosMetricsRichFunction extends RichFlatMapFunction<DosSketchLog, String> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(DosMetricsRichFunction.class);
|
||||
|
||||
private Configuration configuration;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
configuration = (Configuration) getRuntimeContext()
|
||||
.getExecutionConfig().getGlobalJobParameters();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flatMap(DosSketchLog dosSketchLog, Collector<String> out) throws Exception {
|
||||
try {
|
||||
|
||||
if(dosSketchLog.getRule_id()==0) {
|
||||
DosMetricsLog dosMetricsLog = new DosMetricsLog();
|
||||
dosMetricsLog.setSketch_start_time(dosSketchLog.getStart_timestamp_ms() / 1000);
|
||||
dosMetricsLog.setDestination_ip(dosSketchLog.getServer_ip());
|
||||
dosMetricsLog.setAttack_type(dosSketchLog.getAttack_type());
|
||||
dosMetricsLog.setSession_rate(dosSketchLog.getSession_rate());
|
||||
dosMetricsLog.setPacket_rate(dosSketchLog.getPacket_rate());
|
||||
dosMetricsLog.setBit_rate(dosSketchLog.getBit_rate());
|
||||
dosMetricsLog.setVsys_id(dosSketchLog.getVsys_id());
|
||||
dosMetricsLog.setPartition_num(getPartitionNumByIp(dosSketchLog.getServer_ip()));
|
||||
String jsonString = JSON.toJSONString(dosMetricsLog);
|
||||
logger.debug("metric 结果已加载:{}", jsonString);
|
||||
out.collect(jsonString);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("数据解析错误:", e);
|
||||
}
|
||||
}
|
||||
private long timeFloor(long sketchStartTime) {
|
||||
return sketchStartTime / configuration.get(FLINK_WINDOW_MAX_TIME) * configuration.get(FLINK_WINDOW_MAX_TIME);
|
||||
}
|
||||
|
||||
private int getPartitionNumByIp(String destinationIp) {
|
||||
return Math.abs(destinationIp.hashCode()) % configuration.get(DESTINATION_IP_PARTITION_NUM);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,141 +0,0 @@
|
||||
package com.zdjizhi.function;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.common.DosMetricsLog;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.api.java.tuple.Tuple7;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.flink.util.OutputTag;
|
||||
|
||||
import java.util.HashSet;
|
||||
|
||||
import static com.zdjizhi.conf.DosConfigs.*;
|
||||
|
||||
/**
|
||||
* @author 94976
|
||||
*/
|
||||
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple3<String, String, Integer>, TimeWindow> {
|
||||
|
||||
private static final Log logger = LogFactory.get();
|
||||
private final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0";
|
||||
private final String EMPTY_SOURCE_IP_IPV6 = "::";
|
||||
public static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics") {
|
||||
};
|
||||
private Configuration configuration;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
configuration = (Configuration) getRuntimeContext()
|
||||
.getExecutionConfig().getGlobalJobParameters();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Tuple3<String, String, Integer> keys,
|
||||
Context context, Iterable<DosSketchLog> elements,
|
||||
Collector<DosSketchLog> out) {
|
||||
DosSketchLog middleResult = getMiddleResult(keys, elements);
|
||||
try {
|
||||
if (middleResult != null) {
|
||||
out.collect(middleResult);
|
||||
logger.debug("获取中间聚合结果:{}", middleResult.toString());
|
||||
context.output(outputTag, getOutputMetric(middleResult));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) {
|
||||
DosMetricsLog dosMetricsLog = new DosMetricsLog();
|
||||
dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis() / 1000));
|
||||
dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip());
|
||||
dosMetricsLog.setAttack_type(midResuleLog.getAttack_type());
|
||||
dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions());
|
||||
dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets());
|
||||
dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes());
|
||||
dosMetricsLog.setVsys_id(midResuleLog.getVsys_id());
|
||||
dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip()));
|
||||
logger.debug("metric 结果已加载:{}", dosMetricsLog.toString());
|
||||
return dosMetricsLog;
|
||||
}
|
||||
|
||||
private long timeFloor(long sketchStartTime) {
|
||||
return sketchStartTime / configuration.get(FLINK_WINDOW_MAX_TIME) * configuration.get(FLINK_WINDOW_MAX_TIME);
|
||||
}
|
||||
|
||||
private int getPartitionNumByIp(String destinationIp) {
|
||||
return Math.abs(destinationIp.hashCode()) % configuration.get(DESTINATION_IP_PARTITION_NUM);
|
||||
}
|
||||
|
||||
private DosSketchLog getMiddleResult(Tuple3<String, String, Integer> keys, Iterable<DosSketchLog> elements) {
|
||||
|
||||
DosSketchLog midResuleLog = new DosSketchLog();
|
||||
Tuple7<Long, Long, Long, String, Long, Long, Long> values = sketchAggregate(elements);
|
||||
try {
|
||||
if (values != null) {
|
||||
midResuleLog.setAttack_type(keys.f0);
|
||||
midResuleLog.setDestination_ip(keys.f1);
|
||||
midResuleLog.setVsys_id(keys.f2);
|
||||
midResuleLog.setSketch_start_time(values.f4);
|
||||
midResuleLog.setSketch_duration(values.f5);
|
||||
midResuleLog.setSource_ip(values.f3);
|
||||
midResuleLog.setSketch_sessions(values.f0);
|
||||
midResuleLog.setSketch_packets(values.f1);
|
||||
midResuleLog.setSketch_bytes(values.f2);
|
||||
midResuleLog.setCommon_recv_time(values.f6);
|
||||
return midResuleLog;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("加载中间结果集失败,keys: {} values: {}\n{}", keys, values, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private Tuple7<Long, Long, Long, String, Long, Long, Long> sketchAggregate(Iterable<DosSketchLog> elements) {
|
||||
long sessions = 0;
|
||||
long packets = 0;
|
||||
long bytes = 0;
|
||||
long startTime = System.currentTimeMillis() / 1000;
|
||||
long endTime = System.currentTimeMillis() / 1000;
|
||||
long duration = 0;
|
||||
long recvtime = 0;
|
||||
HashSet<String> sourceIpSet = new HashSet<>();
|
||||
try {
|
||||
for (DosSketchLog newSketchLog : elements) {
|
||||
if (recvtime == 0) {
|
||||
recvtime = newSketchLog.getCommon_recv_time();
|
||||
} else if (recvtime > newSketchLog.getCommon_recv_time()) {
|
||||
recvtime = newSketchLog.getCommon_recv_time();
|
||||
}
|
||||
String sourceIp = newSketchLog.getSource_ip();
|
||||
if (StringUtils.equals(sourceIp, EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp, EMPTY_SOURCE_IP_IPV6)) {
|
||||
sessions += newSketchLog.getSketch_sessions();
|
||||
packets += newSketchLog.getSketch_packets();
|
||||
bytes += newSketchLog.getSketch_bytes();
|
||||
startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time();
|
||||
endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime;
|
||||
duration = endTime - startTime == 0 ? 5 : endTime - startTime;
|
||||
} else {
|
||||
if (sourceIpSet.size() < configuration.get(SOURCE_IP_LIST_LIMIT)) {
|
||||
sourceIpSet.add(sourceIp);
|
||||
}
|
||||
}
|
||||
}
|
||||
String sourceIpList = StringUtils.join(sourceIpSet, ",");
|
||||
return Tuple7.of(sessions / configuration.get(FLINK_WINDOW_MAX_TIME), packets / configuration.get(FLINK_WINDOW_MAX_TIME),
|
||||
bytes * 8 / configuration.get(FLINK_WINDOW_MAX_TIME), sourceIpList, startTime, duration, recvtime);
|
||||
} catch (Exception e) {
|
||||
logger.error("聚合中间结果集失败 {}", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package com.zdjizhi.function;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.geedgenetworks.utils.StringUtil;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import com.zdjizhi.common.pojo.DosSketchMetricsLog;
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.slf4j.Logger;
|
||||
@@ -10,44 +11,58 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class FlatSketchFunction implements FlatMapFunction<String, DosSketchLog> {
|
||||
private static Logger logger = LoggerFactory.getLogger(FlatSketchFunction.class);
|
||||
@Override
|
||||
public void flatMap(String value, Collector<DosSketchLog> out) {
|
||||
|
||||
try {
|
||||
if (StringUtil.isNotBlank(value)) {
|
||||
final long recv_time = System.currentTimeMillis()/1000;
|
||||
HashMap<String, Object> sketchSource = JSONObject.parseObject(value, HashMap.class);
|
||||
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
|
||||
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
|
||||
String attackType = sketchSource.get("attack_type").toString();
|
||||
int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString());
|
||||
String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list"));
|
||||
ArrayList<HashMap<String, Object>> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class);
|
||||
for (HashMap<String, Object> obj : reportIpList) {
|
||||
DosSketchLog dosSketchLog = new DosSketchLog();
|
||||
dosSketchLog.setCommon_recv_time(recv_time);
|
||||
dosSketchLog.setSketch_start_time(sketchStartTime);
|
||||
dosSketchLog.setSketch_duration(sketchDuration);
|
||||
dosSketchLog.setAttack_type(attackType);
|
||||
dosSketchLog.setVsys_id(vsysId);
|
||||
String sourceIp = obj.get("source_ip").toString();
|
||||
String destinationIp = obj.get("destination_ip").toString();
|
||||
long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
|
||||
long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString());
|
||||
long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
|
||||
dosSketchLog.setSource_ip(sourceIp);
|
||||
dosSketchLog.setDestination_ip(destinationIp);
|
||||
dosSketchLog.setSketch_sessions(sketchSessions);
|
||||
dosSketchLog.setSketch_packets(sketchPackets);
|
||||
dosSketchLog.setSketch_bytes(sketchBytes);
|
||||
out.collect(dosSketchLog);
|
||||
logger.debug("数据解析成功:{}", dosSketchLog);
|
||||
DosSketchLog dosSketchLog = new DosSketchLog();
|
||||
dosSketchLog.setRecv_time(System.currentTimeMillis()/1000);
|
||||
DosSketchMetricsLog dosSketchMetricsLog = JSONObject.parseObject(value, DosSketchMetricsLog.class);
|
||||
dosSketchLog.setVsys_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("vsys_id", "1")));
|
||||
dosSketchLog.setServer_ip(dosSketchMetricsLog.getTags().getOrDefault("server_ip", ""));
|
||||
dosSketchLog.setDecoded_as(dosSketchMetricsLog.getTags().getOrDefault("decoded_as", ""));
|
||||
dosSketchLog.setDuration(Long.parseLong(dosSketchMetricsLog.getTags().getOrDefault("duration","60000")));
|
||||
dosSketchLog.setTimestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
|
||||
dosSketchLog.setStart_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
|
||||
dosSketchLog.setEnd_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms() + dosSketchLog.getDuration());
|
||||
dosSketchLog.setClient_ip(dosSketchMetricsLog.getTags().getOrDefault("client_ip", ""));
|
||||
dosSketchLog.setData_center(dosSketchMetricsLog.getTags().getOrDefault("data_center", ""));
|
||||
dosSketchLog.setDevice_id(dosSketchMetricsLog.getTags().getOrDefault("device_id", ""));
|
||||
dosSketchLog.setDevice_group(dosSketchMetricsLog.getTags().getOrDefault("device_group", ""));
|
||||
dosSketchLog.setServer_country(dosSketchMetricsLog.getTags().getOrDefault("server_country", ""));
|
||||
dosSketchLog.setClient_country(dosSketchMetricsLog.getTags().getOrDefault("client_country", ""));
|
||||
dosSketchLog.setRule_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("rule_id", "0")));
|
||||
dosSketchLog.setName(dosSketchMetricsLog.getTags().getOrDefault("name", ""));
|
||||
|
||||
Map<String,String> clientips_countrys = new HashMap<>();
|
||||
dosSketchLog.setClientips_countrys(clientips_countrys);
|
||||
if("top_client_and_server_ip".equals(dosSketchMetricsLog.getName())){
|
||||
dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
|
||||
dosSketchLog.setBytes(dosSketchMetricsLog.getFields().getOrDefault("bytes",0L));
|
||||
dosSketchLog.setSessions(dosSketchMetricsLog.getFields().getOrDefault("sessions",0L));
|
||||
clientips_countrys.put(dosSketchLog.getClient_ip(),dosSketchLog.getClient_country());
|
||||
}
|
||||
else if("top_client_ip_and_server_ip".equals(dosSketchMetricsLog.getName())){
|
||||
dosSketchLog.setPkts(0);
|
||||
dosSketchLog.setBytes(0);
|
||||
dosSketchLog.setSessions(0);
|
||||
clientips_countrys.put(dosSketchLog.getClient_ip(),dosSketchLog.getClient_country());
|
||||
}
|
||||
else {
|
||||
dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
|
||||
dosSketchLog.setBytes(dosSketchMetricsLog.getFields().getOrDefault("bytes",0L));
|
||||
dosSketchLog.setSessions(dosSketchMetricsLog.getFields().getOrDefault("sessions",0L));
|
||||
}
|
||||
out.collect(dosSketchLog);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("数据解析错误:{} \n{}", value, e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.zdjizhi.function;
|
||||
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import org.apache.flink.api.common.functions.ReduceFunction;
|
||||
|
||||
public class MetricsAggregationReduce implements ReduceFunction<DosSketchLog> {
|
||||
|
||||
@Override
|
||||
public DosSketchLog reduce(DosSketchLog value1, DosSketchLog value2) throws Exception {
|
||||
value1.setPkts(value1.getPkts() + value2.getPkts());
|
||||
value1.setBytes(value1.getBytes() + value2.getBytes());
|
||||
value1.setSessions(value1.getSessions() + value2.getSessions());
|
||||
if (value1.getRecv_time() > value2.getRecv_time()) {
|
||||
value1.setRecv_time(value2.getRecv_time());
|
||||
}
|
||||
if (value1.getStart_timestamp_ms() > value2.getStart_timestamp_ms()) {
|
||||
value1.setStart_timestamp_ms(value2.getStart_timestamp_ms());
|
||||
}
|
||||
if (value1.getEnd_timestamp_ms() < value2.getEnd_timestamp_ms()) {
|
||||
value1.setEnd_timestamp_ms(value2.getEnd_timestamp_ms());
|
||||
}
|
||||
value1.getClientips_countrys().putAll((value2.getClientips_countrys()));
|
||||
return value1;
|
||||
}
|
||||
}
|
||||
43
src/main/java/com/zdjizhi/function/MetricsCalculate.java
Normal file
43
src/main/java/com/zdjizhi/function/MetricsCalculate.java
Normal file
@@ -0,0 +1,43 @@
|
||||
package com.zdjizhi.function;
|
||||
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.api.java.tuple.Tuple4;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class MetricsCalculate extends ProcessWindowFunction<
|
||||
DosSketchLog, // 输入类型
|
||||
DosSketchLog, // 输出类型
|
||||
Tuple4<String, String, Integer, Integer>, // 键类型
|
||||
TimeWindow> { // 窗口类型
|
||||
private final Map<String, String> attackTypeMapping = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
attackTypeMapping.put("TCP SYN","TCP SYN Flood");
|
||||
attackTypeMapping.put("DNS","UDP Flood");
|
||||
attackTypeMapping.put("ICMP","ICMP Flood");
|
||||
attackTypeMapping.put("UDP","DNS Flood");
|
||||
attackTypeMapping.put("NTP","NTP Flood");
|
||||
attackTypeMapping.put("","Custom Network Attack");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Tuple4<String, String, Integer, Integer> key, ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple4<String, String, Integer,Integer>, TimeWindow>.Context context, Iterable<DosSketchLog> elements, Collector<DosSketchLog> out) throws Exception {
|
||||
|
||||
for (DosSketchLog dosSketchLog: elements){
|
||||
dosSketchLog.setSession_rate(dosSketchLog.getSessions()/ (dosSketchLog.getDuration()/1000) );
|
||||
dosSketchLog.setPacket_rate(dosSketchLog.getPkts()/(dosSketchLog.getDuration()/1000));
|
||||
dosSketchLog.setBit_rate(dosSketchLog.getBytes()/(dosSketchLog.getDuration()/1000));
|
||||
dosSketchLog.setAttack_type(attackTypeMapping.getOrDefault(dosSketchLog.getDecoded_as(),""));
|
||||
out.collect(dosSketchLog);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,14 +2,15 @@ package com.zdjizhi.function;
|
||||
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.api.java.tuple.Tuple4;
|
||||
|
||||
public class SketchKeysSelector implements KeySelector<DosSketchLog, Tuple3<String, String, Integer>> {
|
||||
public class SketchKeysSelector implements KeySelector<DosSketchLog, Tuple4<String, String, Integer,Integer>> {
|
||||
@Override
|
||||
public Tuple3<String, String, Integer> getKey(DosSketchLog dosSketchLog){
|
||||
return Tuple3.of(
|
||||
dosSketchLog.getAttack_type(),
|
||||
dosSketchLog.getDestination_ip(),
|
||||
dosSketchLog.getVsys_id());
|
||||
public Tuple4<String, String, Integer,Integer> getKey(DosSketchLog dosSketchLog){
|
||||
return Tuple4.of(
|
||||
dosSketchLog.getDecoded_as(),
|
||||
dosSketchLog.getServer_ip(),
|
||||
dosSketchLog.getVsys_id(),
|
||||
dosSketchLog.getRule_id());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user