Compare commits
3 Commits
feature/24
...
feature/24
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1de0c27c36 | ||
|
|
9c21e01211 | ||
|
|
548a08d90a |
11
pom.xml
11
pom.xml
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>flink-dos-detection</artifactId>
|
||||
<version>24-09-29</version>
|
||||
<version>24-04-22</version>
|
||||
|
||||
<name>flink-dos-detection</name>
|
||||
<url>http://www.example.com</url>
|
||||
@@ -80,7 +80,7 @@
|
||||
</goals>
|
||||
|
||||
<configuration>
|
||||
<finalName>flink-dos-detection-24-09-29</finalName>
|
||||
<finalName>flink-dos-detection-24-04-22</finalName>
|
||||
<relocations>
|
||||
<relocation>
|
||||
<pattern>org.apache.http</pattern>
|
||||
@@ -134,12 +134,7 @@
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-test-utils_2.12</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.jasypt</groupId>
|
||||
<artifactId>jasypt</artifactId>
|
||||
|
||||
@@ -244,7 +244,7 @@ public class DosSketchLog implements Serializable {
|
||||
}
|
||||
|
||||
|
||||
public int getRule_id() {
|
||||
public int getRule_id() {
|
||||
return rule_id;
|
||||
}
|
||||
|
||||
|
||||
@@ -111,9 +111,9 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
|
||||
String attackType = value.getAttack_type();
|
||||
long sketchSessionsRate = value.getSession_rate();
|
||||
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType);
|
||||
Integer baseSessionRate = getBaseValue(dosBaselineThreshold, value);
|
||||
long diff = sketchSessionsRate - baseSessionRate;
|
||||
return getDosEventLog(value, baseSessionRate, diff, 0, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
|
||||
Integer base = getBaseValue(dosBaselineThreshold, value);
|
||||
long diff = sketchSessionsRate - base;
|
||||
return getDosEventLog(value, base, diff, 0, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
|
||||
}
|
||||
|
||||
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {
|
||||
|
||||
@@ -58,10 +58,7 @@ public class DosMetricsRichFunction extends RichFlatMapFunction<DosSketchLog, St
|
||||
}
|
||||
|
||||
private int getPartitionNumByIp(String destinationIp) {
|
||||
if(destinationIp!=null){
|
||||
return Math.abs(destinationIp.hashCode()) % configuration.get(DESTINATION_IP_PARTITION_NUM);
|
||||
}
|
||||
return 0;
|
||||
return Math.abs(destinationIp.hashCode()) % configuration.get(DESTINATION_IP_PARTITION_NUM);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -21,38 +21,52 @@ public class FlatSketchFunction implements FlatMapFunction<String, DosSketchLog>
|
||||
|
||||
try {
|
||||
if (StringUtil.isNotBlank(value)) {
|
||||
DosSketchLog dosSketchLog = JSONObject.parseObject(value, DosSketchLog.class);
|
||||
DosSketchLog dosSketchLog = new DosSketchLog();
|
||||
dosSketchLog.setRecv_time(System.currentTimeMillis()/1000);
|
||||
dosSketchLog.setStart_timestamp_ms(dosSketchLog.getTimestamp_ms());
|
||||
if(dosSketchLog.getDuration()<=0){
|
||||
dosSketchLog.setDuration(60000);
|
||||
}
|
||||
dosSketchLog.setEnd_timestamp_ms(dosSketchLog.getTimestamp_ms() + dosSketchLog.getDuration());
|
||||
DosSketchMetricsLog dosSketchMetricsLog = JSONObject.parseObject(value, DosSketchMetricsLog.class);
|
||||
dosSketchLog.setVsys_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("vsys_id", "1")));
|
||||
dosSketchLog.setServer_ip(dosSketchMetricsLog.getTags().getOrDefault("server_ip", "").trim());
|
||||
dosSketchLog.setDecoded_as(dosSketchMetricsLog.getTags().getOrDefault("decoded_as", "").trim());
|
||||
dosSketchLog.setDuration(Long.parseLong(dosSketchMetricsLog.getTags().getOrDefault("duration","60000")));
|
||||
dosSketchLog.setTimestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
|
||||
dosSketchLog.setStart_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
|
||||
dosSketchLog.setEnd_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms() + dosSketchLog.getDuration());
|
||||
dosSketchLog.setClient_ip(dosSketchMetricsLog.getTags().getOrDefault("client_ip", "").trim());
|
||||
dosSketchLog.setData_center(dosSketchMetricsLog.getTags().getOrDefault("data_center", "").trim());
|
||||
dosSketchLog.setDevice_id(dosSketchMetricsLog.getTags().getOrDefault("device_id", "").trim());
|
||||
dosSketchLog.setDevice_group(dosSketchMetricsLog.getTags().getOrDefault("device_group", "").trim());
|
||||
dosSketchLog.setServer_country(dosSketchMetricsLog.getTags().getOrDefault("server_country", "").trim());
|
||||
dosSketchLog.setClient_country(dosSketchMetricsLog.getTags().getOrDefault("client_country", "").trim());
|
||||
dosSketchLog.setRule_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("rule_id", "0")));
|
||||
dosSketchLog.setName(dosSketchMetricsLog.getTags().getOrDefault("name", ""));
|
||||
|
||||
HashSet<String> client_ips = new HashSet<>();
|
||||
HashSet<String> client_countrys = new HashSet<>();
|
||||
dosSketchLog.setClient_ips(client_ips);
|
||||
dosSketchLog.setClient_countrys(client_countrys);
|
||||
|
||||
if("top_client_and_server_ip".equals(dosSketchLog.getName())){
|
||||
dosSketchLog.setDecoded_as("");
|
||||
if(dosSketchLog.getClient_ip()!=null) {
|
||||
client_ips.add(dosSketchLog.getClient_ip());
|
||||
}
|
||||
if(dosSketchLog.getClient_country()!=null && !dosSketchLog.getClient_country().isEmpty()) {
|
||||
if("top_client_and_server_ip".equals(dosSketchMetricsLog.getName())){
|
||||
dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
|
||||
dosSketchLog.setBytes(dosSketchMetricsLog.getFields().getOrDefault("bytes",0L));
|
||||
dosSketchLog.setSessions(dosSketchMetricsLog.getFields().getOrDefault("sessions",0L));
|
||||
client_ips.add(dosSketchLog.getClient_ip());
|
||||
if(!dosSketchLog.getClient_country().isEmpty()) {
|
||||
client_countrys.add(dosSketchLog.getClient_country());
|
||||
}
|
||||
}
|
||||
else if("top_client_ip_and_server_ip".equals(dosSketchLog.getName())){
|
||||
else if("top_client_ip_and_server_ip".equals(dosSketchMetricsLog.getName())){
|
||||
dosSketchLog.setPkts(0);
|
||||
dosSketchLog.setBytes(0);
|
||||
dosSketchLog.setSessions(0);
|
||||
if(dosSketchLog.getClient_ip()!=null) {
|
||||
client_ips.add(dosSketchLog.getClient_ip());
|
||||
}
|
||||
if(dosSketchLog.getClient_country()!=null && !dosSketchLog.getClient_country().isEmpty()) {
|
||||
client_ips.add(dosSketchLog.getClient_ip());
|
||||
if(!dosSketchLog.getClient_country().isEmpty()) {
|
||||
client_countrys.add(dosSketchLog.getClient_country());
|
||||
}
|
||||
}
|
||||
else {
|
||||
dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
|
||||
dosSketchLog.setBytes(dosSketchMetricsLog.getFields().getOrDefault("bytes",0L));
|
||||
dosSketchLog.setSessions(dosSketchMetricsLog.getFields().getOrDefault("sessions",0L));
|
||||
}
|
||||
out.collect(dosSketchLog);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -45,7 +45,7 @@ public class MetricsCalculate extends ProcessWindowFunction<
|
||||
dosSketchLog.setSession_rate(dosSketchLog.getSessions()/ (duration/1000) );
|
||||
dosSketchLog.setPacket_rate(dosSketchLog.getPkts()/(duration/1000));
|
||||
dosSketchLog.setBit_rate(dosSketchLog.getBytes()*8/(duration/1000));
|
||||
dosSketchLog.setAttack_type(attackTypeMapping.get(dosSketchLog.getDecoded_as()));
|
||||
dosSketchLog.setAttack_type(attackTypeMapping.getOrDefault(dosSketchLog.getDecoded_as(),""));
|
||||
}catch (RuntimeException e){
|
||||
logger.error(e.toString());
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ public class DosDetectionTest {
|
||||
long pktBase=dosDetectionThreshold.getPackets_per_sec();
|
||||
long bitBase=dosDetectionThreshold.getBits_per_sec();
|
||||
//基于速率进行计算
|
||||
long diffSession = dosSketchLog.getSession_rate() - sessionBase;
|
||||
long diffSession = dosSketchLog.getSessions() - sessionBase;
|
||||
long diffPkt = dosSketchLog.getPkts() - pktBase;
|
||||
long diffByte = dosSketchLog.getBytes() - bitBase;
|
||||
|
||||
@@ -121,7 +121,7 @@ public class DosDetectionTest {
|
||||
dosEventLog.setAttack_type(value.getAttack_type());
|
||||
dosEventLog.setSeverity(severity.severity);
|
||||
// dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
|
||||
dosEventLog.setConditions(getConditions(percent, base, value.getSession_rate(), type, tag,dosEventLog));
|
||||
dosEventLog.setConditions(getConditions(percent, base, value.getSessions(), type, tag,dosEventLog));
|
||||
dosEventLog.setDestination_ip(value.getServer_ip());
|
||||
// dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
|
||||
String ipList = value.getClient_ip();
|
||||
|
||||
@@ -1,97 +1,11 @@
|
||||
package com.zdjizhi.etl;
|
||||
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import com.zdjizhi.function.FlatSketchFunction;
|
||||
import org.apache.flink.api.java.utils.ParameterTool;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
||||
import org.apache.flink.test.util.MiniClusterWithClientResource;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class EtlProcessFunctionTest {
|
||||
@ClassRule
|
||||
public static MiniClusterWithClientResource flinkCluster =
|
||||
new MiniClusterWithClientResource(
|
||||
new MiniClusterResourceConfiguration.Builder()
|
||||
.setNumberSlotsPerTaskManager(1)
|
||||
.setNumberTaskManagers(1)
|
||||
.build());
|
||||
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testIncrementPipeline() throws Exception {
|
||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
public void EtlProcessFunctionTest(){
|
||||
|
||||
|
||||
String jsonString1 = "{\"rule_id\":9055,\"server_ip\":\"192.64.30.9\",\"client_country\":\"US\",\"server_country\":\"CA\",\"name\":\"top_client_and_server_ip\",\"vsys_id\":1,\"device_id\":\"9800165603247024\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"sessions\":89,\"pkts\":89,\"bytes\":5874,\"timestamp_ms\":1723014831175}";
|
||||
String jsonString2 = "{\"server_ip\":\"192.168.41.32\",\"decoded_as\":\"UDP\",\"name\":\"top_server_ip\",\"device_id\":\"21426003\",\"device_group\":\"group-xxg-9140\",\"data_center\":\"center-xxg-9140\",\"vsys_id\":1,\"duration\":60000,\"sessions\":5,\"bytes\":350,\"pkts\":5,\"timestamp_ms\":1723014960000}";
|
||||
String jsonString3 = "{\"client_ip\":\"10.64.23.157\",\"server_ip\":\"10.64.127.184\",\"decoded_as\":\"TCP SYN\",\"name\":\"top_client_ip_and_server_ip\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"duration\":60000,\"sessions\":4,\"bytes\":264,\"pkts\":4,\"timestamp_ms\":1723014900010}";
|
||||
|
||||
env.setParallelism(1);
|
||||
CollectSink.values.clear();
|
||||
|
||||
ParameterTool serviceConfig = ParameterTool.fromPropertiesFile("src\\main\\resources\\detection_dos_attack.properties");
|
||||
Configuration configurationService = serviceConfig.getConfiguration();
|
||||
// global register
|
||||
env.getConfig().setGlobalJobParameters(configurationService);
|
||||
|
||||
|
||||
env.fromElements(jsonString1,jsonString2,jsonString3)
|
||||
.flatMap(new FlatSketchFunction())
|
||||
.addSink(new CollectSink());
|
||||
|
||||
// execute
|
||||
env.execute();
|
||||
assertEquals("", CollectSink.values.get(0).getDecoded_as());
|
||||
assertEquals("192.64.30.9", CollectSink.values.get(0).getServer_ip());
|
||||
assertEquals(9055, CollectSink.values.get(0).getRule_id());
|
||||
assertTrue(CollectSink.values.get(0).getClient_countrys().contains("US"));
|
||||
assertEquals("CA",CollectSink.values.get(0).getServer_country());
|
||||
assertEquals("top_client_and_server_ip",CollectSink.values.get(0).getName());
|
||||
assertEquals(60000,CollectSink.values.get(0).getDuration());
|
||||
assertEquals(1,CollectSink.values.get(0).getVsys_id());
|
||||
assertEquals("9800165603247024",CollectSink.values.get(0).getDevice_id());
|
||||
assertEquals("group-xxg-tsgx",CollectSink.values.get(0).getDevice_group());
|
||||
assertEquals("center-xxg-tsgx",CollectSink.values.get(0).getData_center());
|
||||
assertEquals(89,CollectSink.values.get(0).getSessions());
|
||||
assertEquals(89,CollectSink.values.get(0).getPkts());
|
||||
assertEquals(5874,CollectSink.values.get(0).getBytes());
|
||||
assertEquals(1723014831175L,CollectSink.values.get(0).getStart_timestamp_ms());
|
||||
assertEquals("UDP", CollectSink.values.get(1).getDecoded_as());
|
||||
assertEquals("TCP SYN", CollectSink.values.get(2).getDecoded_as());
|
||||
assertNull(CollectSink.values.get(0).getClient_ip());
|
||||
assertEquals(5,CollectSink.values.get(1).getSessions());
|
||||
assertEquals(5,CollectSink.values.get(1).getPkts());
|
||||
assertEquals(350,CollectSink.values.get(1).getBytes());
|
||||
assertEquals(0,CollectSink.values.get(1).getClient_countrys().size());
|
||||
assertEquals(0,CollectSink.values.get(2).getSessions());
|
||||
assertEquals(0,CollectSink.values.get(2).getPkts());
|
||||
assertEquals(0,CollectSink.values.get(2).getBytes());
|
||||
}
|
||||
|
||||
// create a testing sink
|
||||
private static class CollectSink implements SinkFunction<DosSketchLog> {
|
||||
|
||||
// must be static
|
||||
public static final List<DosSketchLog> values = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
@Override
|
||||
public void invoke(DosSketchLog value, Context context) throws Exception {
|
||||
values.add(value);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user