TSG-20680 对Source Country List中Country去重,为空的不记录
This commit is contained in:
4
pom.xml
4
pom.xml
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
<groupId>com.zdjizhi</groupId>
|
<groupId>com.zdjizhi</groupId>
|
||||||
<artifactId>flink-dos-detection</artifactId>
|
<artifactId>flink-dos-detection</artifactId>
|
||||||
<version>2.1</version>
|
<version>24-04-19</version>
|
||||||
|
|
||||||
<name>flink-dos-detection</name>
|
<name>flink-dos-detection</name>
|
||||||
<url>http://www.example.com</url>
|
<url>http://www.example.com</url>
|
||||||
@@ -80,7 +80,7 @@
|
|||||||
</goals>
|
</goals>
|
||||||
|
|
||||||
<configuration>
|
<configuration>
|
||||||
<finalName>flink-dos-detection</finalName>
|
<finalName>flink-dos-detection-24-04-19</finalName>
|
||||||
<relocations>
|
<relocations>
|
||||||
<relocation>
|
<relocation>
|
||||||
<pattern>org.apache.http</pattern>
|
<pattern>org.apache.http</pattern>
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package com.zdjizhi.common;
|
package com.zdjizhi.common;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
@@ -25,7 +26,9 @@ public class DosSketchLog implements Serializable {
|
|||||||
private long start_timestamp_ms;
|
private long start_timestamp_ms;
|
||||||
private long end_timestamp_ms;
|
private long end_timestamp_ms;
|
||||||
private long duration;
|
private long duration;
|
||||||
private Map<String,String> clientips_countrys;
|
private HashSet<String> client_ips;
|
||||||
|
private HashSet<String> client_countrys;
|
||||||
|
|
||||||
private long session_rate;
|
private long session_rate;
|
||||||
private long packet_rate;
|
private long packet_rate;
|
||||||
private long bit_rate;
|
private long bit_rate;
|
||||||
@@ -48,6 +51,22 @@ public class DosSketchLog implements Serializable {
|
|||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HashSet<String> getClient_ips() {
|
||||||
|
return client_ips;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClient_ips(HashSet<String> client_ips) {
|
||||||
|
this.client_ips = client_ips;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HashSet<String> getClient_countrys() {
|
||||||
|
return client_countrys;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClient_countrys(HashSet<String> client_countrys) {
|
||||||
|
this.client_countrys = client_countrys;
|
||||||
|
}
|
||||||
|
|
||||||
public long getRecv_time() {
|
public long getRecv_time() {
|
||||||
return recv_time;
|
return recv_time;
|
||||||
}
|
}
|
||||||
@@ -192,14 +211,6 @@ public class DosSketchLog implements Serializable {
|
|||||||
this.duration = duration;
|
this.duration = duration;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, String> getClientips_countrys() {
|
|
||||||
return clientips_countrys;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setClientips_countrys(Map<String, String> clientips_countrys) {
|
|
||||||
this.clientips_countrys = clientips_countrys;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getSession_rate() {
|
public long getSession_rate() {
|
||||||
return session_rate;
|
return session_rate;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import org.apache.flink.util.Collector;
|
|||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.text.NumberFormat;
|
import java.text.NumberFormat;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static com.zdjizhi.conf.DosConfigs.*;
|
import static com.zdjizhi.conf.DosConfigs.*;
|
||||||
|
|
||||||
@@ -162,22 +163,8 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
|||||||
}
|
}
|
||||||
dosEventLog.setDestination_ip(value.getServer_ip());
|
dosEventLog.setDestination_ip(value.getServer_ip());
|
||||||
dosEventLog.setDestination_country(value.getServer_country());
|
dosEventLog.setDestination_country(value.getServer_country());
|
||||||
StringBuilder client_ips = new StringBuilder();
|
dosEventLog.setSource_ip_list(value.getClient_ips().stream().filter(ip -> !ip.isEmpty()).collect(Collectors.joining(",")));
|
||||||
StringBuilder client_countrys = new StringBuilder();
|
dosEventLog.setSource_country_list(value.getClient_countrys().stream().filter(ip -> !ip.isEmpty()).collect(Collectors.joining(",")));
|
||||||
Iterator<Map.Entry<String, String>> iterator = value.getClientips_countrys().entrySet().iterator();
|
|
||||||
while (iterator.hasNext()) {
|
|
||||||
Map.Entry<String, String> entry = iterator.next();
|
|
||||||
client_ips.append(entry.getKey());
|
|
||||||
client_countrys.append(entry.getValue());
|
|
||||||
if (iterator.hasNext()) {
|
|
||||||
client_ips.append(",");
|
|
||||||
client_countrys.append(",");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(client_ips.length()>0){
|
|
||||||
dosEventLog.setSource_ip_list(client_ips.toString());
|
|
||||||
dosEventLog.setSource_country_list(client_countrys.toString());
|
|
||||||
}
|
|
||||||
dosEventLog.setSession_rate(value.getSession_rate());
|
dosEventLog.setSession_rate(value.getSession_rate());
|
||||||
dosEventLog.setPacket_rate(value.getPacket_rate());
|
dosEventLog.setPacket_rate(value.getPacket_rate());
|
||||||
dosEventLog.setBit_rate(value.getBit_rate());
|
dosEventLog.setBit_rate(value.getBit_rate());
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
|
|||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class FlatSketchFunction implements FlatMapFunction<String, DosSketchLog> {
|
public class FlatSketchFunction implements FlatMapFunction<String, DosSketchLog> {
|
||||||
@@ -24,34 +25,42 @@ public class FlatSketchFunction implements FlatMapFunction<String, DosSketchLog>
|
|||||||
dosSketchLog.setRecv_time(System.currentTimeMillis()/1000);
|
dosSketchLog.setRecv_time(System.currentTimeMillis()/1000);
|
||||||
DosSketchMetricsLog dosSketchMetricsLog = JSONObject.parseObject(value, DosSketchMetricsLog.class);
|
DosSketchMetricsLog dosSketchMetricsLog = JSONObject.parseObject(value, DosSketchMetricsLog.class);
|
||||||
dosSketchLog.setVsys_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("vsys_id", "1")));
|
dosSketchLog.setVsys_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("vsys_id", "1")));
|
||||||
dosSketchLog.setServer_ip(dosSketchMetricsLog.getTags().getOrDefault("server_ip", ""));
|
dosSketchLog.setServer_ip(dosSketchMetricsLog.getTags().getOrDefault("server_ip", "").trim());
|
||||||
dosSketchLog.setDecoded_as(dosSketchMetricsLog.getTags().getOrDefault("decoded_as", ""));
|
dosSketchLog.setDecoded_as(dosSketchMetricsLog.getTags().getOrDefault("decoded_as", "").trim());
|
||||||
dosSketchLog.setDuration(Long.parseLong(dosSketchMetricsLog.getTags().getOrDefault("duration","60000")));
|
dosSketchLog.setDuration(Long.parseLong(dosSketchMetricsLog.getTags().getOrDefault("duration","60000")));
|
||||||
dosSketchLog.setTimestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
|
dosSketchLog.setTimestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
|
||||||
dosSketchLog.setStart_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
|
dosSketchLog.setStart_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
|
||||||
dosSketchLog.setEnd_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms() + dosSketchLog.getDuration());
|
dosSketchLog.setEnd_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms() + dosSketchLog.getDuration());
|
||||||
dosSketchLog.setClient_ip(dosSketchMetricsLog.getTags().getOrDefault("client_ip", ""));
|
dosSketchLog.setClient_ip(dosSketchMetricsLog.getTags().getOrDefault("client_ip", "").trim());
|
||||||
dosSketchLog.setData_center(dosSketchMetricsLog.getTags().getOrDefault("data_center", ""));
|
dosSketchLog.setData_center(dosSketchMetricsLog.getTags().getOrDefault("data_center", "").trim());
|
||||||
dosSketchLog.setDevice_id(dosSketchMetricsLog.getTags().getOrDefault("device_id", ""));
|
dosSketchLog.setDevice_id(dosSketchMetricsLog.getTags().getOrDefault("device_id", "").trim());
|
||||||
dosSketchLog.setDevice_group(dosSketchMetricsLog.getTags().getOrDefault("device_group", ""));
|
dosSketchLog.setDevice_group(dosSketchMetricsLog.getTags().getOrDefault("device_group", "").trim());
|
||||||
dosSketchLog.setServer_country(dosSketchMetricsLog.getTags().getOrDefault("server_country", ""));
|
dosSketchLog.setServer_country(dosSketchMetricsLog.getTags().getOrDefault("server_country", "").trim());
|
||||||
dosSketchLog.setClient_country(dosSketchMetricsLog.getTags().getOrDefault("client_country", ""));
|
dosSketchLog.setClient_country(dosSketchMetricsLog.getTags().getOrDefault("client_country", "").trim());
|
||||||
dosSketchLog.setRule_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("rule_id", "0")));
|
dosSketchLog.setRule_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("rule_id", "0")));
|
||||||
dosSketchLog.setName(dosSketchMetricsLog.getTags().getOrDefault("name", ""));
|
dosSketchLog.setName(dosSketchMetricsLog.getTags().getOrDefault("name", ""));
|
||||||
|
|
||||||
Map<String,String> clientips_countrys = new HashMap<>();
|
HashSet<String> client_ips = new HashSet<>();
|
||||||
dosSketchLog.setClientips_countrys(clientips_countrys);
|
HashSet<String> client_countrys = new HashSet<>();
|
||||||
|
dosSketchLog.setClient_ips(client_ips);
|
||||||
|
dosSketchLog.setClient_countrys(client_countrys);
|
||||||
if("top_client_and_server_ip".equals(dosSketchMetricsLog.getName())){
|
if("top_client_and_server_ip".equals(dosSketchMetricsLog.getName())){
|
||||||
dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
|
dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
|
||||||
dosSketchLog.setBytes(dosSketchMetricsLog.getFields().getOrDefault("bytes",0L));
|
dosSketchLog.setBytes(dosSketchMetricsLog.getFields().getOrDefault("bytes",0L));
|
||||||
dosSketchLog.setSessions(dosSketchMetricsLog.getFields().getOrDefault("sessions",0L));
|
dosSketchLog.setSessions(dosSketchMetricsLog.getFields().getOrDefault("sessions",0L));
|
||||||
clientips_countrys.put(dosSketchLog.getClient_ip(),dosSketchLog.getClient_country());
|
client_ips.add(dosSketchLog.getClient_ip());
|
||||||
|
if(!dosSketchLog.getClient_country().isEmpty()) {
|
||||||
|
client_countrys.add(dosSketchLog.getClient_country());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if("top_client_ip_and_server_ip".equals(dosSketchMetricsLog.getName())){
|
else if("top_client_ip_and_server_ip".equals(dosSketchMetricsLog.getName())){
|
||||||
dosSketchLog.setPkts(0);
|
dosSketchLog.setPkts(0);
|
||||||
dosSketchLog.setBytes(0);
|
dosSketchLog.setBytes(0);
|
||||||
dosSketchLog.setSessions(0);
|
dosSketchLog.setSessions(0);
|
||||||
clientips_countrys.put(dosSketchLog.getClient_ip(),dosSketchLog.getClient_country());
|
client_ips.add(dosSketchLog.getClient_ip());
|
||||||
|
if(!dosSketchLog.getClient_country().isEmpty()) {
|
||||||
|
client_countrys.add(dosSketchLog.getClient_country());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
|
dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
|
||||||
|
|||||||
@@ -19,7 +19,8 @@ public class MetricsAggregationReduce implements ReduceFunction<DosSketchLog> {
|
|||||||
if (value1.getEnd_timestamp_ms() < value2.getEnd_timestamp_ms()) {
|
if (value1.getEnd_timestamp_ms() < value2.getEnd_timestamp_ms()) {
|
||||||
value1.setEnd_timestamp_ms(value2.getEnd_timestamp_ms());
|
value1.setEnd_timestamp_ms(value2.getEnd_timestamp_ms());
|
||||||
}
|
}
|
||||||
value1.getClientips_countrys().putAll((value2.getClientips_countrys()));
|
value1.getClient_ips().addAll(value2.getClient_ips());
|
||||||
|
value2.getClient_countrys().addAll(value2.getClient_countrys());
|
||||||
return value1;
|
return value1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user