修改时长统计逻辑
This commit is contained in:
3
pom.xml
3
pom.xml
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>flink-dos-detection</artifactId>
|
||||
<version>2.0</version>
|
||||
<version>2.1</version>
|
||||
|
||||
<name>flink-dos-detection</name>
|
||||
<url>http://www.example.com</url>
|
||||
@@ -152,7 +152,6 @@
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-kafka_2.12</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- <dependency>
|
||||
|
||||
@@ -52,15 +52,6 @@ public class DosConfigs {
|
||||
.stringType()
|
||||
.noDefaultValue();
|
||||
|
||||
/* public static final ConfigOption<String> BIFANG_SERVER_URI =
|
||||
ConfigOptions.key("bifang.server.uri")
|
||||
.stringType()
|
||||
.noDefaultValue();*/
|
||||
|
||||
public static final ConfigOption<String> KNOWLEDGE_BASE_URL =
|
||||
ConfigOptions.key("knowledge.base.uri")
|
||||
.stringType()
|
||||
.noDefaultValue();
|
||||
|
||||
|
||||
//==============================The following variables have default values=====================================
|
||||
@@ -108,17 +99,6 @@ public class DosConfigs {
|
||||
.defaultValue(90000);
|
||||
|
||||
|
||||
public static final ConfigOption<String> KNOWLEDGE_BASE_PATH =
|
||||
ConfigOptions.key("knowledge.base.path")
|
||||
.stringType()
|
||||
.defaultValue("/v1/knowledge_base");
|
||||
|
||||
|
||||
/* public static final ConfigOption<Integer> STATIC_THRESHOLD_SCHEDULE_MINUTES =
|
||||
ConfigOptions.key("static.threshold.schedule.minutes")
|
||||
.intType()
|
||||
.defaultValue(10);*/
|
||||
|
||||
|
||||
public static final ConfigOption<Integer> BASELINE_THRESHOLD_SCHEDULE_DAYS =
|
||||
ConfigOptions.key("baseline.threshold.schedule.days")
|
||||
@@ -160,32 +140,6 @@ public class DosConfigs {
|
||||
.doubleType()
|
||||
.defaultValue(8.0);
|
||||
|
||||
|
||||
/*public static final ConfigOption<String> BIFANG_SERVER_ENCRYPTPWD_PATH =
|
||||
ConfigOptions.key("bifang.server.encryptpwd.path")
|
||||
.stringType()
|
||||
.defaultValue("/v1/user/encryptpwd");
|
||||
|
||||
public static final ConfigOption<String> BIFANG_SERVER_POLICY_VSYSID_PATH =
|
||||
ConfigOptions.key("bifang.server.policy.vaysid.path")
|
||||
.stringType()
|
||||
.defaultValue("/v1/admin/vsys");
|
||||
|
||||
public static final ConfigOption<String> BIFANG_SERVER_TOKEN =
|
||||
ConfigOptions.key("bifang.server.token")
|
||||
.stringType()
|
||||
.defaultValue("aa2bdec5518ad131f71944b13ce5c298&1&");
|
||||
|
||||
public static final ConfigOption<String> BIFANG_SERVER_POLICY_THRESHOLD_PATH =
|
||||
ConfigOptions.key("bifang.server.policy.threshold.path")
|
||||
.stringType()
|
||||
.defaultValue("/v1/policy/profile/dos_detection");
|
||||
|
||||
public static final ConfigOption<String> BIFANG_SERVER_LOGIN_PATH =
|
||||
ConfigOptions.key("bifang.server.login.path")
|
||||
.stringType()
|
||||
.defaultValue("/v1/user/login");*/
|
||||
|
||||
public static final ConfigOption<Integer> HTTP_POOL_MAX_CONNECTION =
|
||||
ConfigOptions.key("http.pool.max.connection")
|
||||
.intType()
|
||||
|
||||
@@ -7,6 +7,8 @@ import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@@ -17,6 +19,7 @@ public class MetricsCalculate extends ProcessWindowFunction<
|
||||
Tuple4<String, String, Integer, Integer>, // 键类型
|
||||
TimeWindow> { // 窗口类型
|
||||
private final Map<String, String> attackTypeMapping = new HashMap<>();
|
||||
private static Logger logger = LoggerFactory.getLogger(MetricsCalculate.class);
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
@@ -33,10 +36,19 @@ public class MetricsCalculate extends ProcessWindowFunction<
|
||||
public void process(Tuple4<String, String, Integer, Integer> key, ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple4<String, String, Integer,Integer>, TimeWindow>.Context context, Iterable<DosSketchLog> elements, Collector<DosSketchLog> out) throws Exception {
|
||||
|
||||
for (DosSketchLog dosSketchLog: elements){
|
||||
dosSketchLog.setSession_rate(dosSketchLog.getSessions()/ (dosSketchLog.getDuration()/1000) );
|
||||
dosSketchLog.setPacket_rate(dosSketchLog.getPkts()/(dosSketchLog.getDuration()/1000));
|
||||
dosSketchLog.setBit_rate(dosSketchLog.getBytes()/(dosSketchLog.getDuration()/1000));
|
||||
try {
|
||||
long duration = dosSketchLog.getEnd_timestamp_ms()-dosSketchLog.getStart_timestamp_ms();
|
||||
if(duration<=0){
|
||||
duration = dosSketchLog.getDuration();
|
||||
dosSketchLog.setEnd_timestamp_ms(dosSketchLog.getStart_timestamp_ms()+duration);
|
||||
}
|
||||
dosSketchLog.setSession_rate(dosSketchLog.getSessions()/ (duration/1000) );
|
||||
dosSketchLog.setPacket_rate(dosSketchLog.getPkts()/(duration/1000));
|
||||
dosSketchLog.setBit_rate(dosSketchLog.getBytes()*8/(duration/1000));
|
||||
dosSketchLog.setAttack_type(attackTypeMapping.getOrDefault(dosSketchLog.getDecoded_as(),""));
|
||||
}catch (RuntimeException e){
|
||||
logger.error(e.toString());
|
||||
}
|
||||
out.collect(dosSketchLog);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
source.kafka.topic=DOS-SKETCH-METRIC,DOS-PROTECTION-RULE-METRIC
|
||||
source.kafka.props.bootstrap.servers=192.168.44.12:9094
|
||||
source.kafka.props.bootstrap.servers=192.168.44.11:9094
|
||||
source.kafka.props.group.id=dos-detection-job-20240402-t1
|
||||
source.kafka.props.session.timeout.ms=60000
|
||||
source.kafka.props.max.poll.records=5000
|
||||
@@ -24,7 +24,6 @@ sink.kafka.props.buffer.memory=134217728
|
||||
sink.kafka.props.max.request.size=10485760
|
||||
sink.kafka.props.compression.type=snappy
|
||||
|
||||
|
||||
ip.user.defined.kd.id=dasdasdsad
|
||||
|
||||
#zookeeper地址
|
||||
@@ -35,11 +34,6 @@ flink.watermark.max.orderness=30
|
||||
#计算窗口大小,默认600s
|
||||
flink.window.max.time=600
|
||||
|
||||
#cm服务访问地址
|
||||
bifang.server.uri=http://192.168.44.3
|
||||
|
||||
knowledge.base.uri=http://192.168.45.102:9999
|
||||
############################## 阈值 配置 ######################################
|
||||
static.sensitivity.threshold=1
|
||||
#基线敏感阈值
|
||||
baseline.sensitivity.threshold=0.2
|
||||
|
||||
Reference in New Issue
Block a user