OLAP DoS Detection重组日志结构适配。(TSG-17836)
This commit is contained in:
2
pom.xml
2
pom.xml
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>flink-dos-detection</artifactId>
|
||||
<version>23.11</version>
|
||||
<version>23.12</version>
|
||||
|
||||
<name>flink-dos-detection</name>
|
||||
<url>http://www.example.com</url>
|
||||
|
||||
@@ -2,8 +2,8 @@ package com.zdjizhi.common;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class DosEventLog implements Serializable,Cloneable {
|
||||
|
||||
public class DosEventLog implements Serializable, Cloneable {
|
||||
private long recv_time;
|
||||
private long log_id;
|
||||
private int vsys_id;
|
||||
private long start_time;
|
||||
@@ -20,6 +20,14 @@ public class DosEventLog implements Serializable,Cloneable {
|
||||
private long packet_rate;
|
||||
private long bit_rate;
|
||||
|
||||
public long getRecv_time() {
|
||||
return recv_time;
|
||||
}
|
||||
|
||||
public void setRecv_time(long recv_time) {
|
||||
this.recv_time = recv_time;
|
||||
}
|
||||
|
||||
public long getLog_id() {
|
||||
return log_id;
|
||||
}
|
||||
@@ -143,7 +151,8 @@ public class DosEventLog implements Serializable,Cloneable {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DosEventLog{" +
|
||||
"log_id=" + log_id +
|
||||
"recv_time=" + recv_time +
|
||||
", log_id=" + log_id +
|
||||
", vsys_id=" + vsys_id +
|
||||
", start_time=" + start_time +
|
||||
", end_time=" + end_time +
|
||||
|
||||
@@ -5,6 +5,7 @@ import java.util.Objects;
|
||||
|
||||
public class DosSketchLog implements Serializable {
|
||||
|
||||
private long common_recv_time;
|
||||
private String common_sled_ip;
|
||||
private String common_data_center;
|
||||
private long sketch_start_time;
|
||||
@@ -17,10 +18,12 @@ public class DosSketchLog implements Serializable {
|
||||
private long sketch_bytes;
|
||||
private int vsys_id;
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DosSketchLog{" +
|
||||
"common_sled_ip='" + common_sled_ip + '\'' +
|
||||
"common_recv_time=" + common_recv_time +
|
||||
", common_sled_ip='" + common_sled_ip + '\'' +
|
||||
", common_data_center='" + common_data_center + '\'' +
|
||||
", sketch_start_time=" + sketch_start_time +
|
||||
", sketch_duration=" + sketch_duration +
|
||||
@@ -34,6 +37,14 @@ public class DosSketchLog implements Serializable {
|
||||
'}';
|
||||
}
|
||||
|
||||
public long getCommon_recv_time() {
|
||||
return common_recv_time;
|
||||
}
|
||||
|
||||
public void setCommon_recv_time(long common_recv_time) {
|
||||
this.common_recv_time = common_recv_time;
|
||||
}
|
||||
|
||||
public String getCommon_sled_ip() {
|
||||
return common_sled_ip;
|
||||
}
|
||||
|
||||
@@ -185,6 +185,7 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
||||
|
||||
private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) {
|
||||
DosEventLog dosEventLog = new DosEventLog();
|
||||
dosEventLog.setRecv_time(value.getCommon_recv_time());
|
||||
dosEventLog.setLog_id(SnowflakeId.generateId());
|
||||
dosEventLog.setVsys_id(value.getVsys_id());
|
||||
dosEventLog.setStart_time(value.getSketch_start_time());
|
||||
|
||||
@@ -43,15 +43,21 @@ public class ParseSketchLog {
|
||||
public void flatMap(String s, Collector<DosSketchLog> collector) {
|
||||
try {
|
||||
if (StringUtil.isNotBlank(s)) {
|
||||
|
||||
final long recv_time = System.currentTimeMillis();
|
||||
|
||||
HashMap<String, Object> sketchSource = JSONObject.parseObject(s, HashMap.class);
|
||||
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
|
||||
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
|
||||
String attackType = sketchSource.get("attack_type").toString();
|
||||
int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString());
|
||||
String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list"));
|
||||
|
||||
ArrayList<HashMap<String, Object>> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class);
|
||||
for (HashMap<String, Object> obj : reportIpList) {
|
||||
|
||||
for (HashMap<String, Object> obj : reportIpList) {
|
||||
DosSketchLog dosSketchLog = new DosSketchLog();
|
||||
dosSketchLog.setCommon_recv_time(recv_time);
|
||||
dosSketchLog.setSketch_start_time(sketchStartTime);
|
||||
dosSketchLog.setSketch_duration(sketchDuration);
|
||||
dosSketchLog.setAttack_type(attackType);
|
||||
|
||||
@@ -11,8 +11,8 @@ kafka.input.parallelism=3
|
||||
kafka.input.topic.name=DOS-SKETCH-RECORD
|
||||
|
||||
#输入kafka地址
|
||||
#kafka.input.bootstrap.servers=192.168.44.12:9094
|
||||
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
kafka.input.bootstrap.servers=192.168.44.12:9094
|
||||
#kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
|
||||
#读取kafka group id
|
||||
kafka.input.group.id=dos-detection-job-221125-1
|
||||
@@ -64,7 +64,7 @@ flink.detection.map.parallelism=1
|
||||
flink.watermark.max.orderness=300
|
||||
|
||||
#计算窗口大小,默认600s
|
||||
flink.window.max.time=600
|
||||
flink.window.max.time=60
|
||||
|
||||
#dos event结果中distinct source IP限制
|
||||
source.ip.list.limit=10000
|
||||
|
||||
Reference in New Issue
Block a user