diff --git a/pom.xml b/pom.xml index 6bed19e..5628354 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi flink-dos-detection - 2.1 + 24-04-19 flink-dos-detection http://www.example.com @@ -80,7 +80,7 @@ - flink-dos-detection + flink-dos-detection-24-04-19 org.apache.http diff --git a/src/main/java/com/zdjizhi/common/DosSketchLog.java b/src/main/java/com/zdjizhi/common/DosSketchLog.java index 24c65d2..7a1239d 100644 --- a/src/main/java/com/zdjizhi/common/DosSketchLog.java +++ b/src/main/java/com/zdjizhi/common/DosSketchLog.java @@ -1,6 +1,7 @@ package com.zdjizhi.common; import java.io.Serializable; +import java.util.HashSet; import java.util.Map; import java.util.Objects; @@ -25,7 +26,9 @@ public class DosSketchLog implements Serializable { private long start_timestamp_ms; private long end_timestamp_ms; private long duration; - private Map clientips_countrys; + private HashSet client_ips; + private HashSet client_countrys; + private long session_rate; private long packet_rate; private long bit_rate; @@ -48,6 +51,22 @@ public class DosSketchLog implements Serializable { '}'; } + public HashSet getClient_ips() { + return client_ips; + } + + public void setClient_ips(HashSet client_ips) { + this.client_ips = client_ips; + } + + public HashSet getClient_countrys() { + return client_countrys; + } + + public void setClient_countrys(HashSet client_countrys) { + this.client_countrys = client_countrys; + } + public long getRecv_time() { return recv_time; } @@ -192,14 +211,6 @@ public class DosSketchLog implements Serializable { this.duration = duration; } - public Map getClientips_countrys() { - return clientips_countrys; - } - - public void setClientips_countrys(Map clientips_countrys) { - this.clientips_countrys = clientips_countrys; - } - public long getSession_rate() { return session_rate; } diff --git a/src/main/java/com/zdjizhi/function/DosDetectionFunction.java b/src/main/java/com/zdjizhi/function/DosDetectionFunction.java index 91623a4..190c20b 100644 --- a/src/main/java/com/zdjizhi/function/DosDetectionFunction.java +++ b/src/main/java/com/zdjizhi/function/DosDetectionFunction.java @@ -15,6 +15,7 @@ import org.apache.flink.util.Collector; import java.math.BigDecimal; import java.text.NumberFormat; import java.util.*; +import java.util.stream.Collectors; import static com.zdjizhi.conf.DosConfigs.*; @@ -162,22 +163,8 @@ public class DosDetectionFunction extends ProcessFunction> iterator = value.getClientips_countrys().entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - client_ips.append(entry.getKey()); - client_countrys.append(entry.getValue()); - if (iterator.hasNext()) { - client_ips.append(","); - client_countrys.append(","); - } - } - if(client_ips.length()>0){ - dosEventLog.setSource_ip_list(client_ips.toString()); - dosEventLog.setSource_country_list(client_countrys.toString()); - } + dosEventLog.setSource_ip_list(value.getClient_ips().stream().filter(ip -> !ip.isEmpty()).collect(Collectors.joining(","))); + dosEventLog.setSource_country_list(value.getClient_countrys().stream().filter(ip -> !ip.isEmpty()).collect(Collectors.joining(","))); dosEventLog.setSession_rate(value.getSession_rate()); dosEventLog.setPacket_rate(value.getPacket_rate()); dosEventLog.setBit_rate(value.getBit_rate()); diff --git a/src/main/java/com/zdjizhi/function/FlatSketchFunction.java b/src/main/java/com/zdjizhi/function/FlatSketchFunction.java index 5c1de1d..f21944b 100644 --- a/src/main/java/com/zdjizhi/function/FlatSketchFunction.java +++ b/src/main/java/com/zdjizhi/function/FlatSketchFunction.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; public class FlatSketchFunction implements FlatMapFunction { @@ -24,34 +25,42 @@ public class FlatSketchFunction implements FlatMapFunction dosSketchLog.setRecv_time(System.currentTimeMillis()/1000); DosSketchMetricsLog dosSketchMetricsLog = JSONObject.parseObject(value, DosSketchMetricsLog.class); dosSketchLog.setVsys_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("vsys_id", "1"))); - dosSketchLog.setServer_ip(dosSketchMetricsLog.getTags().getOrDefault("server_ip", "")); - dosSketchLog.setDecoded_as(dosSketchMetricsLog.getTags().getOrDefault("decoded_as", "")); + dosSketchLog.setServer_ip(dosSketchMetricsLog.getTags().getOrDefault("server_ip", "").trim()); + dosSketchLog.setDecoded_as(dosSketchMetricsLog.getTags().getOrDefault("decoded_as", "").trim()); dosSketchLog.setDuration(Long.parseLong(dosSketchMetricsLog.getTags().getOrDefault("duration","60000"))); dosSketchLog.setTimestamp_ms(dosSketchMetricsLog.getTimestamp_ms()); dosSketchLog.setStart_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms()); dosSketchLog.setEnd_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms() + dosSketchLog.getDuration()); - dosSketchLog.setClient_ip(dosSketchMetricsLog.getTags().getOrDefault("client_ip", "")); - dosSketchLog.setData_center(dosSketchMetricsLog.getTags().getOrDefault("data_center", "")); - dosSketchLog.setDevice_id(dosSketchMetricsLog.getTags().getOrDefault("device_id", "")); - dosSketchLog.setDevice_group(dosSketchMetricsLog.getTags().getOrDefault("device_group", "")); - dosSketchLog.setServer_country(dosSketchMetricsLog.getTags().getOrDefault("server_country", "")); - dosSketchLog.setClient_country(dosSketchMetricsLog.getTags().getOrDefault("client_country", "")); + dosSketchLog.setClient_ip(dosSketchMetricsLog.getTags().getOrDefault("client_ip", "").trim()); + dosSketchLog.setData_center(dosSketchMetricsLog.getTags().getOrDefault("data_center", "").trim()); + dosSketchLog.setDevice_id(dosSketchMetricsLog.getTags().getOrDefault("device_id", "").trim()); + dosSketchLog.setDevice_group(dosSketchMetricsLog.getTags().getOrDefault("device_group", "").trim()); + dosSketchLog.setServer_country(dosSketchMetricsLog.getTags().getOrDefault("server_country", "").trim()); + dosSketchLog.setClient_country(dosSketchMetricsLog.getTags().getOrDefault("client_country", "").trim()); dosSketchLog.setRule_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("rule_id", "0"))); dosSketchLog.setName(dosSketchMetricsLog.getTags().getOrDefault("name", "")); - Map clientips_countrys = new HashMap<>(); - dosSketchLog.setClientips_countrys(clientips_countrys); + HashSet client_ips = new HashSet<>(); + HashSet client_countrys = new HashSet<>(); + dosSketchLog.setClient_ips(client_ips); + dosSketchLog.setClient_countrys(client_countrys); if("top_client_and_server_ip".equals(dosSketchMetricsLog.getName())){ dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L)); dosSketchLog.setBytes(dosSketchMetricsLog.getFields().getOrDefault("bytes",0L)); dosSketchLog.setSessions(dosSketchMetricsLog.getFields().getOrDefault("sessions",0L)); - clientips_countrys.put(dosSketchLog.getClient_ip(),dosSketchLog.getClient_country()); + client_ips.add(dosSketchLog.getClient_ip()); + if(!dosSketchLog.getClient_country().isEmpty()) { + client_countrys.add(dosSketchLog.getClient_country()); + } } else if("top_client_ip_and_server_ip".equals(dosSketchMetricsLog.getName())){ dosSketchLog.setPkts(0); dosSketchLog.setBytes(0); dosSketchLog.setSessions(0); - clientips_countrys.put(dosSketchLog.getClient_ip(),dosSketchLog.getClient_country()); + client_ips.add(dosSketchLog.getClient_ip()); + if(!dosSketchLog.getClient_country().isEmpty()) { + client_countrys.add(dosSketchLog.getClient_country()); + } } else { dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L)); diff --git a/src/main/java/com/zdjizhi/function/MetricsAggregationReduce.java b/src/main/java/com/zdjizhi/function/MetricsAggregationReduce.java index 6148755..fa19c63 100644 --- a/src/main/java/com/zdjizhi/function/MetricsAggregationReduce.java +++ b/src/main/java/com/zdjizhi/function/MetricsAggregationReduce.java @@ -19,7 +19,8 @@ public class MetricsAggregationReduce implements ReduceFunction { if (value1.getEnd_timestamp_ms() < value2.getEnd_timestamp_ms()) { value1.setEnd_timestamp_ms(value2.getEnd_timestamp_ms()); } - value1.getClientips_countrys().putAll((value2.getClientips_countrys())); + value1.getClient_ips().addAll(value2.getClient_ips()); + value2.getClient_countrys().addAll(value2.getClient_countrys()); return value1; } }