3 Commits

8 changed files with 44 additions and 124 deletions

11
pom.xml
View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>flink-dos-detection</artifactId>
<version>24-09-29</version>
<version>24-04-22</version>
<name>flink-dos-detection</name>
<url>http://www.example.com</url>
@@ -80,7 +80,7 @@
</goals>
<configuration>
<finalName>flink-dos-detection-24-09-29</finalName>
<finalName>flink-dos-detection-24-04-22</finalName>
<relocations>
<relocation>
<pattern>org.apache.http</pattern>
@@ -134,12 +134,7 @@
</build>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.12</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jasypt</groupId>
<artifactId>jasypt</artifactId>

View File

@@ -244,7 +244,7 @@ public class DosSketchLog implements Serializable {
}
public int getRule_id() {
public int getRule_id() {
return rule_id;
}

View File

@@ -111,9 +111,9 @@ public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEvent
String attackType = value.getAttack_type();
long sketchSessionsRate = value.getSession_rate();
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType);
Integer baseSessionRate = getBaseValue(dosBaselineThreshold, value);
long diff = sketchSessionsRate - baseSessionRate;
return getDosEventLog(value, baseSessionRate, diff, 0, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
Integer base = getBaseValue(dosBaselineThreshold, value);
long diff = sketchSessionsRate - base;
return getDosEventLog(value, base, diff, 0, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
}
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {

View File

@@ -58,10 +58,7 @@ public class DosMetricsRichFunction extends RichFlatMapFunction<DosSketchLog, St
}
private int getPartitionNumByIp(String destinationIp) {
if(destinationIp!=null){
return Math.abs(destinationIp.hashCode()) % configuration.get(DESTINATION_IP_PARTITION_NUM);
}
return 0;
return Math.abs(destinationIp.hashCode()) % configuration.get(DESTINATION_IP_PARTITION_NUM);
}
}

View File

@@ -21,38 +21,52 @@ public class FlatSketchFunction implements FlatMapFunction<String, DosSketchLog>
try {
if (StringUtil.isNotBlank(value)) {
DosSketchLog dosSketchLog = JSONObject.parseObject(value, DosSketchLog.class);
DosSketchLog dosSketchLog = new DosSketchLog();
dosSketchLog.setRecv_time(System.currentTimeMillis()/1000);
dosSketchLog.setStart_timestamp_ms(dosSketchLog.getTimestamp_ms());
if(dosSketchLog.getDuration()<=0){
dosSketchLog.setDuration(60000);
}
dosSketchLog.setEnd_timestamp_ms(dosSketchLog.getTimestamp_ms() + dosSketchLog.getDuration());
DosSketchMetricsLog dosSketchMetricsLog = JSONObject.parseObject(value, DosSketchMetricsLog.class);
dosSketchLog.setVsys_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("vsys_id", "1")));
dosSketchLog.setServer_ip(dosSketchMetricsLog.getTags().getOrDefault("server_ip", "").trim());
dosSketchLog.setDecoded_as(dosSketchMetricsLog.getTags().getOrDefault("decoded_as", "").trim());
dosSketchLog.setDuration(Long.parseLong(dosSketchMetricsLog.getTags().getOrDefault("duration","60000")));
dosSketchLog.setTimestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
dosSketchLog.setStart_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
dosSketchLog.setEnd_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms() + dosSketchLog.getDuration());
dosSketchLog.setClient_ip(dosSketchMetricsLog.getTags().getOrDefault("client_ip", "").trim());
dosSketchLog.setData_center(dosSketchMetricsLog.getTags().getOrDefault("data_center", "").trim());
dosSketchLog.setDevice_id(dosSketchMetricsLog.getTags().getOrDefault("device_id", "").trim());
dosSketchLog.setDevice_group(dosSketchMetricsLog.getTags().getOrDefault("device_group", "").trim());
dosSketchLog.setServer_country(dosSketchMetricsLog.getTags().getOrDefault("server_country", "").trim());
dosSketchLog.setClient_country(dosSketchMetricsLog.getTags().getOrDefault("client_country", "").trim());
dosSketchLog.setRule_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("rule_id", "0")));
dosSketchLog.setName(dosSketchMetricsLog.getTags().getOrDefault("name", ""));
HashSet<String> client_ips = new HashSet<>();
HashSet<String> client_countrys = new HashSet<>();
dosSketchLog.setClient_ips(client_ips);
dosSketchLog.setClient_countrys(client_countrys);
if("top_client_and_server_ip".equals(dosSketchLog.getName())){
dosSketchLog.setDecoded_as("");
if(dosSketchLog.getClient_ip()!=null) {
client_ips.add(dosSketchLog.getClient_ip());
}
if(dosSketchLog.getClient_country()!=null && !dosSketchLog.getClient_country().isEmpty()) {
if("top_client_and_server_ip".equals(dosSketchMetricsLog.getName())){
dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
dosSketchLog.setBytes(dosSketchMetricsLog.getFields().getOrDefault("bytes",0L));
dosSketchLog.setSessions(dosSketchMetricsLog.getFields().getOrDefault("sessions",0L));
client_ips.add(dosSketchLog.getClient_ip());
if(!dosSketchLog.getClient_country().isEmpty()) {
client_countrys.add(dosSketchLog.getClient_country());
}
}
else if("top_client_ip_and_server_ip".equals(dosSketchLog.getName())){
else if("top_client_ip_and_server_ip".equals(dosSketchMetricsLog.getName())){
dosSketchLog.setPkts(0);
dosSketchLog.setBytes(0);
dosSketchLog.setSessions(0);
if(dosSketchLog.getClient_ip()!=null) {
client_ips.add(dosSketchLog.getClient_ip());
}
if(dosSketchLog.getClient_country()!=null && !dosSketchLog.getClient_country().isEmpty()) {
client_ips.add(dosSketchLog.getClient_ip());
if(!dosSketchLog.getClient_country().isEmpty()) {
client_countrys.add(dosSketchLog.getClient_country());
}
}
else {
dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
dosSketchLog.setBytes(dosSketchMetricsLog.getFields().getOrDefault("bytes",0L));
dosSketchLog.setSessions(dosSketchMetricsLog.getFields().getOrDefault("sessions",0L));
}
out.collect(dosSketchLog);
}
} catch (Exception e) {

View File

@@ -45,7 +45,7 @@ public class MetricsCalculate extends ProcessWindowFunction<
dosSketchLog.setSession_rate(dosSketchLog.getSessions()/ (duration/1000) );
dosSketchLog.setPacket_rate(dosSketchLog.getPkts()/(duration/1000));
dosSketchLog.setBit_rate(dosSketchLog.getBytes()*8/(duration/1000));
dosSketchLog.setAttack_type(attackTypeMapping.get(dosSketchLog.getDecoded_as()));
dosSketchLog.setAttack_type(attackTypeMapping.getOrDefault(dosSketchLog.getDecoded_as(),""));
}catch (RuntimeException e){
logger.error(e.toString());
}

View File

@@ -56,7 +56,7 @@ public class DosDetectionTest {
long pktBase=dosDetectionThreshold.getPackets_per_sec();
long bitBase=dosDetectionThreshold.getBits_per_sec();
//基于速率进行计算
long diffSession = dosSketchLog.getSession_rate() - sessionBase;
long diffSession = dosSketchLog.getSessions() - sessionBase;
long diffPkt = dosSketchLog.getPkts() - pktBase;
long diffByte = dosSketchLog.getBytes() - bitBase;
@@ -121,7 +121,7 @@ public class DosDetectionTest {
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
// dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
dosEventLog.setConditions(getConditions(percent, base, value.getSession_rate(), type, tag,dosEventLog));
dosEventLog.setConditions(getConditions(percent, base, value.getSessions(), type, tag,dosEventLog));
dosEventLog.setDestination_ip(value.getServer_ip());
// dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
String ipList = value.getClient_ip();

View File

@@ -1,97 +1,11 @@
package com.zdjizhi.etl;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.function.FlatSketchFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.*;
public class EtlProcessFunctionTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.build());
@Test
public void testIncrementPipeline() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public void EtlProcessFunctionTest(){
String jsonString1 = "{\"rule_id\":9055,\"server_ip\":\"192.64.30.9\",\"client_country\":\"US\",\"server_country\":\"CA\",\"name\":\"top_client_and_server_ip\",\"vsys_id\":1,\"device_id\":\"9800165603247024\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"sessions\":89,\"pkts\":89,\"bytes\":5874,\"timestamp_ms\":1723014831175}";
String jsonString2 = "{\"server_ip\":\"192.168.41.32\",\"decoded_as\":\"UDP\",\"name\":\"top_server_ip\",\"device_id\":\"21426003\",\"device_group\":\"group-xxg-9140\",\"data_center\":\"center-xxg-9140\",\"vsys_id\":1,\"duration\":60000,\"sessions\":5,\"bytes\":350,\"pkts\":5,\"timestamp_ms\":1723014960000}";
String jsonString3 = "{\"client_ip\":\"10.64.23.157\",\"server_ip\":\"10.64.127.184\",\"decoded_as\":\"TCP SYN\",\"name\":\"top_client_ip_and_server_ip\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"duration\":60000,\"sessions\":4,\"bytes\":264,\"pkts\":4,\"timestamp_ms\":1723014900010}";
env.setParallelism(1);
CollectSink.values.clear();
ParameterTool serviceConfig = ParameterTool.fromPropertiesFile("src\\main\\resources\\detection_dos_attack.properties");
Configuration configurationService = serviceConfig.getConfiguration();
// global register
env.getConfig().setGlobalJobParameters(configurationService);
env.fromElements(jsonString1,jsonString2,jsonString3)
.flatMap(new FlatSketchFunction())
.addSink(new CollectSink());
// execute
env.execute();
assertEquals("", CollectSink.values.get(0).getDecoded_as());
assertEquals("192.64.30.9", CollectSink.values.get(0).getServer_ip());
assertEquals(9055, CollectSink.values.get(0).getRule_id());
assertTrue(CollectSink.values.get(0).getClient_countrys().contains("US"));
assertEquals("CA",CollectSink.values.get(0).getServer_country());
assertEquals("top_client_and_server_ip",CollectSink.values.get(0).getName());
assertEquals(60000,CollectSink.values.get(0).getDuration());
assertEquals(1,CollectSink.values.get(0).getVsys_id());
assertEquals("9800165603247024",CollectSink.values.get(0).getDevice_id());
assertEquals("group-xxg-tsgx",CollectSink.values.get(0).getDevice_group());
assertEquals("center-xxg-tsgx",CollectSink.values.get(0).getData_center());
assertEquals(89,CollectSink.values.get(0).getSessions());
assertEquals(89,CollectSink.values.get(0).getPkts());
assertEquals(5874,CollectSink.values.get(0).getBytes());
assertEquals(1723014831175L,CollectSink.values.get(0).getStart_timestamp_ms());
assertEquals("UDP", CollectSink.values.get(1).getDecoded_as());
assertEquals("TCP SYN", CollectSink.values.get(2).getDecoded_as());
assertNull(CollectSink.values.get(0).getClient_ip());
assertEquals(5,CollectSink.values.get(1).getSessions());
assertEquals(5,CollectSink.values.get(1).getPkts());
assertEquals(350,CollectSink.values.get(1).getBytes());
assertEquals(0,CollectSink.values.get(1).getClient_countrys().size());
assertEquals(0,CollectSink.values.get(2).getSessions());
assertEquals(0,CollectSink.values.get(2).getPkts());
assertEquals(0,CollectSink.values.get(2).getBytes());
}
// create a testing sink
private static class CollectSink implements SinkFunction<DosSketchLog> {
// must be static
public static final List<DosSketchLog> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(DosSketchLog value, Context context) throws Exception {
values.add(value);
}
}
}