Compare commits
13 Commits
dos-detect
...
tsg-v08
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c790354c0 | ||
|
|
ca19b6c0e6 | ||
|
|
b95ca44c01 | ||
|
|
b2936abe1a | ||
|
|
e4425f8116 | ||
|
|
28e7275674 | ||
|
|
f744677021 | ||
|
|
c957f3ec1c | ||
|
|
9bda526d48 | ||
|
|
e89e1b08c9 | ||
|
|
e0de04886b | ||
|
|
30a24683e3 | ||
|
|
5190654a8f |
80
pom.xml
80
pom.xml
@@ -114,13 +114,6 @@
|
||||
<version>1.7.21</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-sql-connector-kafka_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<!--<scope>provided</scope>-->
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-kafka_2.11</artifactId>
|
||||
@@ -128,53 +121,6 @@
|
||||
<!--<scope>provided</scope>-->
|
||||
</dependency>
|
||||
|
||||
<!--<!– https://mvnrepository.com/artifact/org.apache.flink/flink-table –>-->
|
||||
<!--<dependency>-->
|
||||
<!--<groupId>org.apache.flink</groupId>-->
|
||||
<!--<artifactId>flink-table</artifactId>-->
|
||||
<!--<version>${flink.version}</version>-->
|
||||
<!--<!–<scope>provided</scope>–>-->
|
||||
<!--</dependency>-->
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-json</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
|
||||
<!--Flink modules-->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-api-java</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<!--<scope>provided</scope>-->
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-planner-blink_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<!--<scope>provided</scope>-->
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-planner_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<!--<scope>provided</scope>-->
|
||||
</dependency>
|
||||
|
||||
<!-- CLI dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
@@ -198,6 +144,23 @@
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
<version>2.2.3</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
@@ -214,11 +177,10 @@
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-hbase-2.2_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
<version>4.5.6</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
@@ -230,7 +192,7 @@
|
||||
<dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
<version>1.0.4</version>
|
||||
<version>1.0.6</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
|
||||
@@ -13,16 +13,16 @@ public class CommonConfig {
|
||||
public static final int KAFKA_INPUT_PARALLELISM = CommonConfigurations.getIntProperty("kafka.input.parallelism");
|
||||
public static final String KAFKA_INPUT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.input.topic.name");
|
||||
public static final String KAFKA_INPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.input.bootstrap.servers");
|
||||
public static final String KAFKA_SCAN_STARTUP_MODE = CommonConfigurations.getStringProperty("kafka.input.scan.startup.mode");
|
||||
public static final String KAFKA_GROUP_ID = CommonConfigurations.getStringProperty("kafka.input.group.id");
|
||||
|
||||
public static final int KAFKA_OUTPUT_METRIC_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.metric.parallelism");
|
||||
public static final String KAFKA_OUTPUT_METRIC_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.metric.topic.name");
|
||||
public static final int KAFKA_OUTPUT_EVENT_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.event.parallelism");
|
||||
public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.event.topic.name");
|
||||
public static final String KAFKA_OUTPUT_SKETCH_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.sketch.topic.name");
|
||||
public static final int KAFKA_OUTPUT_SKETCH_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.sketch.parallelism");
|
||||
public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.output.bootstrap.servers");
|
||||
|
||||
public static final int HBASE_INPUT_PARALLELISM = CommonConfigurations.getIntProperty("hbase.input.parallelism");
|
||||
public static final String HBASE_ZOOKEEPER_QUORUM = CommonConfigurations.getStringProperty("hbase.zookeeper.quorum");
|
||||
public static final int HBASE_CLIENT_OPERATION_TIMEOUT = CommonConfigurations.getIntProperty("hbase.client.operation.timeout");
|
||||
public static final int HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = CommonConfigurations.getIntProperty("hbase.client.scanner.timeout.period");
|
||||
@@ -31,19 +31,24 @@ public class CommonConfig {
|
||||
public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num");
|
||||
|
||||
public static final int FLINK_FIRST_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.first.agg.parallelism");
|
||||
public static final int FLINK_SECOND_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.second.agg.parallelism");
|
||||
public static final int FLINK_DETECTION_MAP_PARALLELISM = CommonConfigurations.getIntProperty("flink.detection.map.parallelism");
|
||||
public static final int FLINK_WATERMARK_MAX_ORDERNESS = CommonConfigurations.getIntProperty("flink.watermark.max.orderness");
|
||||
public static final int FLINK_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("flink.window.max.time");
|
||||
|
||||
public static final int SOURCE_IP_LIST_LIMIT = CommonConfigurations.getIntProperty("source.ip.list.limit");
|
||||
public static final int DESTINATION_IP_PARTITION_NUM = CommonConfigurations.getIntProperty("destination.ip.partition.num");
|
||||
public static final int DATA_CENTER_ID_NUM = CommonConfigurations.getIntProperty("data.center.id.num");
|
||||
|
||||
public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path");
|
||||
|
||||
public static final int SENSITIVITY_THRESHOLD = CommonConfigurations.getIntProperty("sensitivity.threshold");
|
||||
|
||||
public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.minor.threshold");
|
||||
public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.warning.threshold");
|
||||
public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.major.threshold");
|
||||
public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.severe.threshold");
|
||||
public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.critical.threshold");
|
||||
|
||||
public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days");
|
||||
|
||||
}
|
||||
|
||||
@@ -5,13 +5,20 @@ import java.io.Serializable;
|
||||
public class DosMetricsLog implements Serializable {
|
||||
|
||||
private long sketch_start_time;
|
||||
private String common_sled_ip;
|
||||
private String common_data_center;
|
||||
private String attack_type;
|
||||
private String destination_ip;
|
||||
private long session_rate;
|
||||
private long packet_rate;
|
||||
private long bit_rate;
|
||||
private int partition_num;
|
||||
|
||||
public int getPartition_num() {
|
||||
return partition_num;
|
||||
}
|
||||
|
||||
public void setPartition_num(int partition_num) {
|
||||
this.partition_num = partition_num;
|
||||
}
|
||||
|
||||
public long getSketch_start_time() {
|
||||
return sketch_start_time;
|
||||
@@ -21,22 +28,6 @@ public class DosMetricsLog implements Serializable {
|
||||
this.sketch_start_time = sketch_start_time;
|
||||
}
|
||||
|
||||
public String getCommon_sled_ip() {
|
||||
return common_sled_ip;
|
||||
}
|
||||
|
||||
public void setCommon_sled_ip(String common_sled_ip) {
|
||||
this.common_sled_ip = common_sled_ip;
|
||||
}
|
||||
|
||||
public String getCommon_data_center() {
|
||||
return common_data_center;
|
||||
}
|
||||
|
||||
public void setCommon_data_center(String common_data_center) {
|
||||
this.common_data_center = common_data_center;
|
||||
}
|
||||
|
||||
public String getAttack_type() {
|
||||
return attack_type;
|
||||
}
|
||||
@@ -81,13 +72,12 @@ public class DosMetricsLog implements Serializable {
|
||||
public String toString() {
|
||||
return "DosMetricsLog{" +
|
||||
"sketch_start_time=" + sketch_start_time +
|
||||
", common_sled_ip='" + common_sled_ip + '\'' +
|
||||
", common_data_center='" + common_data_center + '\'' +
|
||||
", attack_type='" + attack_type + '\'' +
|
||||
", destination_ip='" + destination_ip + '\'' +
|
||||
", session_rate=" + session_rate +
|
||||
", packet_rate=" + packet_rate +
|
||||
", bit_rate=" + bit_rate +
|
||||
", partition_num=" + partition_num +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,112 +3,89 @@ package com.zdjizhi.etl;
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosEventLog;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import com.zdjizhi.sink.OutputStreamSink;
|
||||
import com.zdjizhi.utils.HbaseUtils;
|
||||
import com.zdjizhi.utils.IpUtils;
|
||||
import com.zdjizhi.utils.SnowflakeId;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.flink.api.common.state.MapStateDescriptor;
|
||||
import org.apache.flink.api.common.typeinfo.Types;
|
||||
import org.apache.flink.api.java.typeutils.MapTypeInfo;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.apache.flink.api.common.functions.RichMapFunction;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.text.NumberFormat;
|
||||
import java.text.ParseException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author wlh
|
||||
* DoS检测判断逻辑
|
||||
*/
|
||||
public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<String, Map<String, List<Integer>>>, DosEventLog> {
|
||||
public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DosDetection.class);
|
||||
|
||||
private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap;
|
||||
private final static int BASELINE_SIZE = 144;
|
||||
|
||||
private static MapStateDescriptor<String, Map<String, Map<String, List<Integer>>>> descriptor = new MapStateDescriptor<>("boradcast-state",
|
||||
Types.STRING,
|
||||
new MapTypeInfo<>(String.class, new MapTypeInfo<>(String.class, (Class<List<Integer>>) (Class<?>) List.class).getTypeClass()));
|
||||
|
||||
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) {
|
||||
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
|
||||
new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build());
|
||||
try {
|
||||
executorService.scheduleAtFixedRate(() -> {
|
||||
//do something
|
||||
baselineMap = HbaseUtils.readFromHbase();
|
||||
}, 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
|
||||
|
||||
}catch (Exception e){
|
||||
logger.error("定时器任务执行失败", e);
|
||||
}
|
||||
PERCENT_INSTANCE.setMinimumFractionDigits(2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processElement(DosSketchLog value, ReadOnlyContext ctx, Collector<DosEventLog> out) throws Exception {
|
||||
public DosEventLog map(DosSketchLog value) {
|
||||
try {
|
||||
Map<String, Map<String, List<Integer>>> broadcast = ctx.getBroadcastState(descriptor).get("broadcast-state");
|
||||
String destinationIp = value.getDestination_ip();
|
||||
String attackType = value.getAttack_type();
|
||||
logger.info("当前判断IP:{}, 类型: {}",destinationIp,attackType);
|
||||
if (broadcast.containsKey(destinationIp)){
|
||||
List<Integer> baseline = broadcast.get(destinationIp).get(attackType);
|
||||
if (baseline != null && baseline.size() == BASELINE_SIZE){
|
||||
int timeIndex = getCurrentTimeIndex(value.getSketch_start_time());
|
||||
Integer base = baseline.get(timeIndex);
|
||||
long sketchSessions = value.getSketch_sessions();
|
||||
long diff = sketchSessions - base;
|
||||
if (diff > 0){
|
||||
String percent = getDiffPercent(diff, sketchSessions);
|
||||
double diffPercentDouble = getDiffPercentDouble(percent);
|
||||
Severity severity = judgeSeverity(diffPercentDouble);
|
||||
if (severity != Severity.NORMAL){
|
||||
DosEventLog result = getResult(value, severity, percent);
|
||||
logger.info("检测到当前server IP {} 存在 {} 异常,日志详情\n {}",destinationIp,attackType,result.toString());
|
||||
out.collect(result);
|
||||
}else {
|
||||
logger.info("当前server IP:{} 未出现 {} 异常,日志详情 {}",destinationIp,attackType,value.toString());
|
||||
}
|
||||
logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType);
|
||||
long sketchSessions = value.getSketch_sessions();
|
||||
if (sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD && baselineMap.containsKey(destinationIp)) {
|
||||
Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
|
||||
Integer base = getBaseValue(floodTypeTup, value);
|
||||
long diff = sketchSessions - base;
|
||||
if (diff > 0 && base != 0) {
|
||||
double percent = getDiffPercent(diff, base);
|
||||
Severity severity = judgeSeverity(percent);
|
||||
if (severity != Severity.NORMAL) {
|
||||
DosEventLog result = getResult(value, severity, percent);
|
||||
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp, attackType, base, percent, result);
|
||||
return result;
|
||||
} else {
|
||||
logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
|
||||
}
|
||||
}
|
||||
}else {
|
||||
logger.info("未获取到当前server IP:{} 类型 {} baseline数据",destinationIp,attackType);
|
||||
} else {
|
||||
logger.debug("未获取到当前server IP:{} 类型 {} baseline数据", destinationIp, attackType);
|
||||
}
|
||||
}catch (Exception e){
|
||||
logger.error("判定失败\n {} \n{}",value,e);
|
||||
} catch (Exception e) {
|
||||
logger.error("判定失败\n {} \n{}", value, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processBroadcastElement(Map<String, Map<String, List<Integer>>> value, Context ctx, Collector<DosEventLog> out) {
|
||||
try {
|
||||
ctx.getBroadcastState(descriptor).put("broadcast-state", value);
|
||||
}catch (Exception e){
|
||||
logger.error("更新广播状态失败 {}",e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
DosDetection dosDetection = new DosDetection();
|
||||
// HashSet<String> strings = new HashSet<>();
|
||||
// strings.add("13.46.241.36");
|
||||
// strings.add("25.46.241.45");
|
||||
// strings.add("133.46.241.53");
|
||||
// strings.add("219.46.242.74");
|
||||
// strings.add("153.146.241.196");
|
||||
// strings.add("132.46.241.21");
|
||||
// String join = StringUtils.join(strings, ",");
|
||||
// System.out.println(IpUtils.ipLookup.countryLookup("192.168.50.150"));
|
||||
System.out.println(Severity.CRITICAL.severity);
|
||||
}
|
||||
|
||||
private DosEventLog getResult(DosSketchLog value,Severity severity,String percent){
|
||||
private DosEventLog getResult(DosSketchLog value, Severity severity, double percent) {
|
||||
DosEventLog dosEventLog = new DosEventLog();
|
||||
dosEventLog.setLog_id(SnowflakeId.generateId());
|
||||
dosEventLog.setStart_time(value.getSketch_start_time());
|
||||
dosEventLog.setEnd_time(value.getSketch_start_time()+CommonConfig.FLINK_WINDOW_MAX_TIME);
|
||||
dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.FLINK_WINDOW_MAX_TIME);
|
||||
dosEventLog.setAttack_type(value.getAttack_type());
|
||||
dosEventLog.setSeverity(severity.toString());
|
||||
dosEventLog.setConditions(getConditions(percent));
|
||||
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent)));
|
||||
dosEventLog.setDestination_ip(value.getDestination_ip());
|
||||
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
|
||||
String ipList = value.getSource_ip();
|
||||
@@ -120,47 +97,70 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str
|
||||
return dosEventLog;
|
||||
}
|
||||
|
||||
private String getConditions(String percent){
|
||||
return "sessions > "+percent+" of baseline";
|
||||
private Integer getBaseValue(Tuple2<ArrayList<Integer>, Integer> floodTypeTup, DosSketchLog value) {
|
||||
Integer base = 0;
|
||||
try {
|
||||
if (floodTypeTup != null) {
|
||||
ArrayList<Integer> baselines = floodTypeTup.f0;
|
||||
Integer defaultVaule = floodTypeTup.f1;
|
||||
if (baselines != null && baselines.size() == BASELINE_SIZE) {
|
||||
int timeIndex = getCurrentTimeIndex(value.getSketch_start_time());
|
||||
base = baselines.get(timeIndex);
|
||||
if (base == 0) {
|
||||
logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getDestination_ip(), value.getAttack_type(), defaultVaule);
|
||||
base = defaultVaule;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("解析baseline数据失败,返回默认值0", e);
|
||||
}
|
||||
return base;
|
||||
}
|
||||
|
||||
private String getSourceCountryList(String sourceIpList){
|
||||
private String getConditions(String percent) {
|
||||
return "sessions > " + percent + " of baseline";
|
||||
}
|
||||
|
||||
private String getSourceCountryList(String sourceIpList) {
|
||||
String[] ipArr = sourceIpList.split(",");
|
||||
HashSet<String> countrySet = new HashSet<>();
|
||||
for (String ip:ipArr){
|
||||
for (String ip : ipArr) {
|
||||
countrySet.add(IpUtils.ipLookup.countryLookup(ip));
|
||||
}
|
||||
return StringUtils.join(countrySet,",");
|
||||
return StringUtils.join(countrySet, ",");
|
||||
}
|
||||
|
||||
private int getCurrentTimeIndex(long sketchStartTime){
|
||||
private int getCurrentTimeIndex(long sketchStartTime) {
|
||||
long currentDayTime = sketchStartTime / (60 * 60 * 24) * 60 * 60 * 24;
|
||||
long indexLong = (sketchStartTime - currentDayTime) / 600;
|
||||
return Integer.parseInt(Long.toString(indexLong));
|
||||
}
|
||||
|
||||
private String getDiffPercent(long diff,long sketchSessions){
|
||||
double diffDou = Double.parseDouble(Long.toString(diff));
|
||||
double sessDou = Double.parseDouble(Long.toString(sketchSessions));
|
||||
return PERCENT_INSTANCE.format(diffDou / sessDou);
|
||||
private Double getDiffPercent(long diff, long base) {
|
||||
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
|
||||
}
|
||||
|
||||
private double getDiffPercentDouble(String diffPercent) throws ParseException {
|
||||
return PERCENT_INSTANCE.parse(diffPercent).doubleValue();
|
||||
public static void main(String[] args) {
|
||||
DosDetection dosDetection = new DosDetection();
|
||||
double diffPercent = dosDetection.getDiffPercent(135, 17);
|
||||
System.out.println(diffPercent);
|
||||
System.out.println(dosDetection.judgeSeverity(4.2857142857142856E14));
|
||||
System.out.println(BigDecimal.valueOf((float) 10 / 3).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue());
|
||||
}
|
||||
|
||||
private Severity judgeSeverity(double diffPercent){
|
||||
if (diffPercent >= CommonConfig.BASELINE_SESSIONS_MINOR_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_WARNING_THRESHOLD){
|
||||
private Severity judgeSeverity(double diffPercent) {
|
||||
if (diffPercent >= CommonConfig.BASELINE_SESSIONS_MINOR_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_WARNING_THRESHOLD) {
|
||||
return Severity.MINOR;
|
||||
}else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_WARNING_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD){
|
||||
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_WARNING_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD) {
|
||||
return Severity.WARNING;
|
||||
}else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD){
|
||||
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD) {
|
||||
return Severity.MAJOR;
|
||||
}else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD){
|
||||
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD) {
|
||||
return Severity.SEVERE;
|
||||
}else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD){
|
||||
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD) {
|
||||
return Severity.CRITICAL;
|
||||
}else {
|
||||
} else {
|
||||
return Severity.NORMAL;
|
||||
}
|
||||
}
|
||||
@@ -188,4 +188,5 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package com.zdjizhi.etl;
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.flink.api.java.tuple.Tuple4;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple6;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
@@ -18,18 +18,18 @@ import static com.zdjizhi.sink.OutputStreamSink.outputTag;
|
||||
/**
|
||||
* @author 94976
|
||||
*/
|
||||
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple4<String,String,String,String>, TimeWindow> {
|
||||
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple2<String,String>, TimeWindow> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
|
||||
@Override
|
||||
public void process(Tuple4<String,String, String, String> keys,
|
||||
public void process(Tuple2<String, String> keys,
|
||||
Context context, Iterable<DosSketchLog> elements,
|
||||
Collector<DosSketchLog> out) {
|
||||
DosSketchLog middleResult = getMiddleResult(keys, elements);
|
||||
try {
|
||||
if (middleResult != null){
|
||||
out.collect(middleResult);
|
||||
logger.info("获取中间聚合结果:{}",middleResult.toString());
|
||||
logger.debug("获取中间聚合结果:{}",middleResult.toString());
|
||||
context.output(outputTag,TrafficServerIpMetrics.getOutputMetric(middleResult));
|
||||
}
|
||||
}catch (Exception e){
|
||||
@@ -37,16 +37,14 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
|
||||
}
|
||||
}
|
||||
|
||||
private DosSketchLog getMiddleResult(Tuple4<String,String, String, String> keys,Iterable<DosSketchLog> elements){
|
||||
private DosSketchLog getMiddleResult(Tuple2<String, String> keys,Iterable<DosSketchLog> elements){
|
||||
|
||||
DosSketchLog midResuleLog = new DosSketchLog();
|
||||
Tuple6<Long, Long, Long,String,Long,Long> values = sketchAggregate(elements);
|
||||
try {
|
||||
if (values != null){
|
||||
midResuleLog.setCommon_sled_ip(keys.f0);
|
||||
midResuleLog.setCommon_data_center(keys.f1);
|
||||
midResuleLog.setDestination_ip(keys.f3);
|
||||
midResuleLog.setAttack_type(keys.f2);
|
||||
midResuleLog.setAttack_type(keys.f0);
|
||||
midResuleLog.setDestination_ip(keys.f1);
|
||||
midResuleLog.setSketch_start_time(values.f4);
|
||||
midResuleLog.setSketch_duration(values.f5);
|
||||
midResuleLog.setSource_ip(values.f3);
|
||||
@@ -82,7 +80,9 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
|
||||
}
|
||||
}
|
||||
String sourceIpList = StringUtils.join(sourceIpSet, ",");
|
||||
return Tuple6.of(sessions/cnt,packets/cnt,bytes/cnt,sourceIpList,startTime,duration);
|
||||
// return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration);
|
||||
return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME,
|
||||
bytes/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration);
|
||||
}catch (Exception e){
|
||||
logger.error("聚合中间结果集失败 {}",e);
|
||||
}
|
||||
|
||||
@@ -17,6 +17,9 @@ import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
* @author wlh
|
||||
*/
|
||||
public class ParseSketchLog {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
|
||||
@@ -37,20 +40,16 @@ public class ParseSketchLog {
|
||||
|
||||
private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> {
|
||||
@Override
|
||||
public void flatMap(String s, Collector<DosSketchLog> collector) throws Exception {
|
||||
public void flatMap(String s, Collector<DosSketchLog> collector) {
|
||||
try {
|
||||
if (StringUtil.isNotBlank(s)){
|
||||
HashMap<String, Object> sketchSource = (HashMap<String, Object>) JsonMapper.fromJsonString(s, Object.class);
|
||||
String commonSledIp = sketchSource.get("common_sled_ip").toString();
|
||||
String commonDataCenter = sketchSource.get("common_data_center").toString();
|
||||
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
|
||||
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
|
||||
String attackType = sketchSource.get("attack_type").toString();
|
||||
ArrayList<HashMap<String, Object>> reportIpList = (ArrayList<HashMap<String, Object>>) sketchSource.get("report_ip_list");
|
||||
for (HashMap<String, Object> obj : reportIpList) {
|
||||
DosSketchLog dosSketchLog = new DosSketchLog();
|
||||
dosSketchLog.setCommon_sled_ip(commonSledIp);
|
||||
dosSketchLog.setCommon_data_center(commonDataCenter);
|
||||
dosSketchLog.setSketch_start_time(sketchStartTime);
|
||||
dosSketchLog.setSketch_duration(sketchDuration);
|
||||
dosSketchLog.setAttack_type(attackType);
|
||||
@@ -61,11 +60,11 @@ public class ParseSketchLog {
|
||||
long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
|
||||
dosSketchLog.setSource_ip(sourceIp);
|
||||
dosSketchLog.setDestination_ip(destinationIp);
|
||||
dosSketchLog.setSketch_sessions(sketchSessions/sketchDuration);
|
||||
dosSketchLog.setSketch_packets(sketchPackets/sketchDuration);
|
||||
dosSketchLog.setSketch_bytes(sketchBytes*8/sketchDuration);
|
||||
dosSketchLog.setSketch_sessions(sketchSessions);
|
||||
dosSketchLog.setSketch_packets(sketchPackets);
|
||||
dosSketchLog.setSketch_bytes(sketchBytes);
|
||||
collector.collect(dosSketchLog);
|
||||
logger.info("数据解析成功:{}",dosSketchLog.toString());
|
||||
logger.debug("数据解析成功:{}",dosSketchLog.toString());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -14,14 +14,13 @@ class TrafficServerIpMetrics {
|
||||
static DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) {
|
||||
DosMetricsLog dosMetricsLog = new DosMetricsLog();
|
||||
dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis()/1000));
|
||||
dosMetricsLog.setCommon_sled_ip(midResuleLog.getCommon_sled_ip());
|
||||
dosMetricsLog.setCommon_data_center(midResuleLog.getCommon_data_center());
|
||||
dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip());
|
||||
dosMetricsLog.setAttack_type(midResuleLog.getAttack_type());
|
||||
dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions());
|
||||
dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets());
|
||||
dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes());
|
||||
logger.info("metric 结果已加载:{}",dosMetricsLog.toString());
|
||||
dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip()));
|
||||
logger.debug("metric 结果已加载:{}",dosMetricsLog.toString());
|
||||
return dosMetricsLog;
|
||||
}
|
||||
|
||||
@@ -29,4 +28,13 @@ class TrafficServerIpMetrics {
|
||||
return sketchStartTime / CommonConfig.FLINK_WINDOW_MAX_TIME * CommonConfig.FLINK_WINDOW_MAX_TIME;
|
||||
}
|
||||
|
||||
private static int getPartitionNumByIp(String destinationIp){
|
||||
return Math.abs(destinationIp.hashCode()) % CommonConfig.DESTINATION_IP_PARTITION_NUM;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println(getPartitionNumByIp("146.177.223.43"));
|
||||
System.out.println("146.177.223.43".hashCode());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -2,6 +2,10 @@ package com.zdjizhi.main;
|
||||
|
||||
import com.zdjizhi.sink.OutputStreamSink;
|
||||
|
||||
/**
|
||||
* @author wlh
|
||||
* 程序主类入口
|
||||
*/
|
||||
public class DosDetectionApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@@ -6,10 +6,15 @@ import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.KafkaUtils;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
class DosEventSink {
|
||||
|
||||
static void dosEventOutputSink(SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream){
|
||||
dosEventLogOutputStream.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
|
||||
dosEventLogOutputStream
|
||||
.filter(Objects::nonNull)
|
||||
.map(JsonMapper::toJsonString)
|
||||
.addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
|
||||
.setParallelism(CommonConfig.KAFKA_OUTPUT_EVENT_PARALLELISM);
|
||||
}
|
||||
|
||||
|
||||
21
src/main/java/com/zdjizhi/sink/DosSketchSink.java
Normal file
21
src/main/java/com/zdjizhi/sink/DosSketchSink.java
Normal file
@@ -0,0 +1,21 @@
|
||||
package com.zdjizhi.sink;
|
||||
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.KafkaUtils;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
class DosSketchSink {
|
||||
|
||||
static void dosSketchOutputSink(SingleOutputStreamOperator<DosSketchLog> sketchSource){
|
||||
sketchSource
|
||||
.filter(Objects::nonNull)
|
||||
.map(JsonMapper::toJsonString)
|
||||
.addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_SKETCH_TOPIC_NAME))
|
||||
.setParallelism(CommonConfig.KAFKA_OUTPUT_SKETCH_PARALLELISM);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -4,19 +4,12 @@ import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosEventLog;
|
||||
import com.zdjizhi.common.DosMetricsLog;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import com.zdjizhi.etl.EtlProcessFunction;
|
||||
import com.zdjizhi.etl.DosDetection;
|
||||
import com.zdjizhi.etl.EtlProcessFunction;
|
||||
import com.zdjizhi.etl.ParseSketchLog;
|
||||
import com.zdjizhi.source.BaselineSource;
|
||||
import com.zdjizhi.utils.FlinkEnvironmentUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.flink.api.common.functions.ReduceFunction;
|
||||
import org.apache.flink.api.common.state.MapStateDescriptor;
|
||||
import org.apache.flink.api.common.typeinfo.Types;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple4;
|
||||
import org.apache.flink.api.java.typeutils.MapTypeInfo;
|
||||
import org.apache.flink.streaming.api.datastream.*;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
@@ -24,8 +17,6 @@ import org.apache.flink.util.OutputTag;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* @author 94976
|
||||
*/
|
||||
@@ -34,15 +25,13 @@ public class OutputStreamSink {
|
||||
|
||||
public static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics"){};
|
||||
|
||||
private static MapStateDescriptor<String, Map<String, Map<String, List<Integer>>>> descriptor = new MapStateDescriptor<>("boradcast-state",
|
||||
Types.STRING,
|
||||
new MapTypeInfo<>(String.class, new MapTypeInfo<>(String.class, (Class<List<Integer>>) (Class<?>) List.class).getTypeClass()));
|
||||
|
||||
public static void finalOutputSink(){
|
||||
try {
|
||||
SingleOutputStreamOperator<DosSketchLog> sketchSource = ParseSketchLog.getSketchSource();
|
||||
DosSketchSink.dosSketchOutputSink(sketchSource);
|
||||
|
||||
SingleOutputStreamOperator<DosSketchLog> middleStream = getMiddleStream();
|
||||
SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream = getOutputSinkStream(middleStream);
|
||||
DosEventSink.dosEventOutputSink(dosEventLogOutputStream);
|
||||
DosEventSink.dosEventOutputSink(getEventSinkStream(middleStream));
|
||||
TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream);
|
||||
FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME);
|
||||
} catch (Exception e) {
|
||||
@@ -50,89 +39,25 @@ public class OutputStreamSink {
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
SingleOutputStreamOperator<DosSketchLog> middleStream = getMiddleStream();
|
||||
SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream = getOutputSinkStream(middleStream);
|
||||
DosEventSink.dosEventOutputSink(dosEventLogOutputStream);
|
||||
TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream);
|
||||
dosEventLogOutputStream.print();
|
||||
FlinkEnvironmentUtils.streamExeEnv.execute();
|
||||
}
|
||||
|
||||
private static SingleOutputStreamOperator<DosEventLog> getOutputSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
|
||||
|
||||
BroadcastStream<Map<String, Map<String,List<Integer>>>> broadcast = FlinkEnvironmentUtils.streamExeEnv
|
||||
.addSource(new BaselineSource())
|
||||
.setParallelism(CommonConfig.HBASE_INPUT_PARALLELISM)
|
||||
.broadcast(descriptor);
|
||||
logger.info("广播变量加载成功!!");
|
||||
|
||||
return middleStream.keyBy(new SecondKeySelector())
|
||||
// .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME)))
|
||||
.reduce(new SecondReduceFunc())
|
||||
.connect(broadcast)
|
||||
.process(new DosDetection())
|
||||
.setParallelism(CommonConfig.FLINK_SECOND_AGG_PARALLELISM);
|
||||
private static SingleOutputStreamOperator<DosEventLog> getEventSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
|
||||
return middleStream.map(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM);
|
||||
}
|
||||
|
||||
private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){
|
||||
return ParseSketchLog.getSketchSource()
|
||||
.keyBy(new FirstKeySelector())
|
||||
.keyBy(new KeysSelector())
|
||||
.window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME)))
|
||||
.process(new EtlProcessFunction())
|
||||
.setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM);
|
||||
}
|
||||
|
||||
private static String groupUniqSourceIp(String sourceIp1,String sourceIp2){
|
||||
HashSet<String> sourceIpSet = new HashSet<>();
|
||||
Collections.addAll(sourceIpSet, (sourceIp1 + "," + sourceIp2).split(","));
|
||||
if (sourceIpSet.size() > CommonConfig.SOURCE_IP_LIST_LIMIT){
|
||||
return StringUtils.join(takeUniqLimit(sourceIpSet,CommonConfig.SOURCE_IP_LIST_LIMIT),",");
|
||||
}
|
||||
return StringUtils.join(sourceIpSet,",");
|
||||
}
|
||||
|
||||
private static<T> Collection<T> takeUniqLimit(Collection<T> collection, int limit){
|
||||
int i =0;
|
||||
Collection<T> newSet = new HashSet<>();
|
||||
for (T t:collection){
|
||||
if (i < limit){
|
||||
newSet.add(t);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
return newSet;
|
||||
}
|
||||
|
||||
private static class FirstKeySelector implements KeySelector<DosSketchLog, Tuple4<String, String, String, String>>{
|
||||
private static class KeysSelector implements KeySelector<DosSketchLog, Tuple2<String, String>>{
|
||||
@Override
|
||||
public Tuple4<String, String, String, String> getKey(DosSketchLog dosSketchLog) throws Exception {
|
||||
return Tuple4.of(
|
||||
dosSketchLog.getCommon_sled_ip(),
|
||||
dosSketchLog.getCommon_data_center(),
|
||||
dosSketchLog.getAttack_type(),
|
||||
dosSketchLog.getDestination_ip());
|
||||
}
|
||||
}
|
||||
|
||||
private static class SecondKeySelector implements KeySelector<DosSketchLog, Tuple2<String, String>> {
|
||||
@Override
|
||||
public Tuple2<String, String> getKey(DosSketchLog dosSketchLog) throws Exception {
|
||||
public Tuple2<String, String> getKey(DosSketchLog dosSketchLog){
|
||||
return Tuple2.of(
|
||||
dosSketchLog.getAttack_type(),
|
||||
dosSketchLog.getDestination_ip());
|
||||
}
|
||||
}
|
||||
|
||||
private static class SecondReduceFunc implements ReduceFunction<DosSketchLog> {
|
||||
@Override
|
||||
public DosSketchLog reduce(DosSketchLog value1, DosSketchLog value2) throws Exception {
|
||||
value1.setSketch_sessions((value1.getSketch_sessions()+value2.getSketch_sessions())/2);
|
||||
value1.setSketch_bytes((value1.getSketch_bytes()+value2.getSketch_bytes())/2);
|
||||
value1.setSketch_packets((value1.getSketch_packets()+value2.getSketch_packets())/2);
|
||||
value1.setSource_ip(groupUniqSourceIp(value1.getSource_ip(),value2.getSource_ip()));
|
||||
return value1;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,128 +0,0 @@
|
||||
package com.zdjizhi.source;
|
||||
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author wlh
|
||||
*/
|
||||
public class BaselineSource extends RichSourceFunction<Map<String, Map<String,List<Integer>>>> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(BaselineSource.class);
|
||||
private Connection conn = null;
|
||||
private Table table = null;
|
||||
private Scan scan = null;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
|
||||
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
|
||||
|
||||
config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
|
||||
config.set("hbase.client.retries.number", "3");
|
||||
config.set("hbase.bulkload.retries.number", "3");
|
||||
config.set("zookeeper.recovery.retry", "3");
|
||||
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
||||
|
||||
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
|
||||
conn = ConnectionFactory.createConnection(config);
|
||||
table = conn.getTable(tableName);
|
||||
scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
|
||||
logger.info("连接hbase成功,正在读取baseline数据");
|
||||
|
||||
// .addFamily(Bytes.toBytes(CommonConfig.HBASE_BASELINE_FAMLIY_NAME));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
super.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(SourceContext<Map<String, Map<String,List<Integer>>>> sourceContext) throws Exception {
|
||||
logger.info("开始读取baseline数据");
|
||||
ResultScanner rs = table.getScanner(scan);
|
||||
// Map<String, List<Integer>[]> baselineMap = new HashMap<>();
|
||||
Map<String, Map<String,List<Integer>>> baselineMap = new HashMap<>();
|
||||
for (Result result : rs) {
|
||||
Map<String, List<Integer>> floodTypeMap = new HashMap<>();
|
||||
String rowkey = Bytes.toString(result.getRow());
|
||||
ArrayList<Integer> tcp = getArraylist(result,"TCP SYN Flood", "session_num");
|
||||
ArrayList<Integer> udp = getArraylist(result,"UDP Flood", "session_num");
|
||||
ArrayList<Integer> icmp = getArraylist(result,"ICMP Flood", "session_num");
|
||||
ArrayList<Integer> dns = getArraylist(result,"DNS Amplification", "session_num");
|
||||
floodTypeMap.put("TCP SYN Flood",tcp);
|
||||
floodTypeMap.put("UDP Flood",udp);
|
||||
floodTypeMap.put("ICMP Flood",icmp);
|
||||
floodTypeMap.put("DNS Amplification",dns);
|
||||
// List[] arr = new ArrayList[]{tcp,udp,icmp,dns};
|
||||
baselineMap.put(rowkey,floodTypeMap);
|
||||
}
|
||||
sourceContext.collect(baselineMap);
|
||||
logger.info("格式化baseline数据成功,读取IP共:{}",baselineMap.size());
|
||||
}
|
||||
|
||||
private static ArrayList<Integer> getArraylist(Result result,String family,String qualifier) throws IOException {
|
||||
if (!result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier))){
|
||||
return null;
|
||||
}
|
||||
ArrayWritable w = new ArrayWritable(IntWritable.class);
|
||||
w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)))));
|
||||
return fromWritable(w);
|
||||
}
|
||||
|
||||
private static ArrayList<Integer> fromWritable(ArrayWritable writable) {
|
||||
Writable[] writables = writable.get();
|
||||
ArrayList<Integer> list = new ArrayList<>(writables.length);
|
||||
for (Writable wrt : writables) {
|
||||
list.add(((IntWritable)wrt).get());
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
try {
|
||||
if (table != null) {
|
||||
table.close();
|
||||
}
|
||||
if (conn != null) {
|
||||
conn.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
env.enableCheckpointing(5000);
|
||||
DataStreamSource<Map<String, Map<String,List<Integer>>>> mapDataStreamSource = env.addSource(new BaselineSource());
|
||||
DataStream<Map<String, Map<String,List<Integer>>>> broadcast = mapDataStreamSource.broadcast();
|
||||
mapDataStreamSource.print();
|
||||
env.execute();
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author wlh
|
||||
*/
|
||||
public class DosSketchSource {
|
||||
|
||||
private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv;
|
||||
|
||||
23
src/main/java/com/zdjizhi/utils/CollectionUtils.java
Normal file
23
src/main/java/com/zdjizhi/utils/CollectionUtils.java
Normal file
@@ -0,0 +1,23 @@
|
||||
package com.zdjizhi.utils;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
|
||||
/**
|
||||
* @author wlh
|
||||
* 扩展集合处理工具
|
||||
*/
|
||||
public class CollectionUtils {
|
||||
|
||||
public static<T> Collection<T> takeUniqueLimit(Collection<T> collection, int limit){
|
||||
int i =0;
|
||||
Collection<T> newSet = new HashSet<>();
|
||||
for (T t:collection){
|
||||
if (i < limit){
|
||||
newSet.add(t);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
return newSet;
|
||||
}
|
||||
}
|
||||
@@ -2,8 +2,6 @@ package com.zdjizhi.utils;
|
||||
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.table.api.EnvironmentSettings;
|
||||
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
|
||||
|
||||
|
||||
/**
|
||||
@@ -12,16 +10,8 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
|
||||
public class FlinkEnvironmentUtils {
|
||||
public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
|
||||
public static StreamTableEnvironment getStreamTableEnv() {
|
||||
static {
|
||||
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
|
||||
|
||||
EnvironmentSettings settings = EnvironmentSettings.newInstance()
|
||||
.useBlinkPlanner()
|
||||
.inStreamingMode()
|
||||
.build();
|
||||
|
||||
return StreamTableEnvironment.create(streamExeEnv, settings);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,123 @@
|
||||
package com.zdjizhi.utils;
|
||||
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* @author wlh
|
||||
*/
|
||||
public class HbaseUtils {
|
||||
private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class);
|
||||
private static Table table = null;
|
||||
private static Scan scan = null;
|
||||
private static ArrayList<String> floodTypeList = new ArrayList<>();
|
||||
|
||||
static {
|
||||
floodTypeList.add("TCP SYN Flood");
|
||||
floodTypeList.add("UDP Flood");
|
||||
floodTypeList.add("ICMP Flood");
|
||||
floodTypeList.add("DNS Amplification");
|
||||
}
|
||||
|
||||
private static void prepareHbaseEnv() throws IOException {
|
||||
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
|
||||
|
||||
config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
|
||||
config.set("hbase.client.retries.number", "3");
|
||||
config.set("hbase.bulkload.retries.number", "3");
|
||||
config.set("zookeeper.recovery.retry", "3");
|
||||
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
||||
|
||||
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
|
||||
Connection conn = ConnectionFactory.createConnection(config);
|
||||
table = conn.getTable(tableName);
|
||||
scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
|
||||
logger.info("连接hbase成功,正在读取baseline数据");
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = readFromHbase();
|
||||
Set<String> keySet = baselineMap.keySet();
|
||||
for (String key : keySet) {
|
||||
Map<String, Tuple2<ArrayList<Integer>, Integer>> stringTuple2Map = baselineMap.get(key);
|
||||
Set<String> strings = stringTuple2Map.keySet();
|
||||
for (String s:strings){
|
||||
Tuple2<ArrayList<Integer>, Integer> arrayListIntegerTuple2 = stringTuple2Map.get(s);
|
||||
System.out.println(key+"---"+s+"---"+arrayListIntegerTuple2.f0+"---"+arrayListIntegerTuple2.f1);
|
||||
}
|
||||
}
|
||||
System.out.println(baselineMap.size());
|
||||
}
|
||||
|
||||
public static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> readFromHbase() {
|
||||
Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
|
||||
try {
|
||||
prepareHbaseEnv();
|
||||
logger.info("开始读取baseline数据");
|
||||
ResultScanner rs = table.getScanner(scan);
|
||||
for (Result result : rs) {
|
||||
Map<String, Tuple2<ArrayList<Integer>, Integer>> floodTypeMap = new HashMap<>();
|
||||
String rowkey = Bytes.toString(result.getRow());
|
||||
for (String type:floodTypeList){
|
||||
ArrayList<Integer> sessionRate = getArraylist(result, type, "session_rate");
|
||||
if (sessionRate != null && !sessionRate.isEmpty()){
|
||||
Integer defaultValue = getDefaultValue(result, type, "session_rate_default_value");
|
||||
floodTypeMap.put(type,Tuple2.of(sessionRate, defaultValue));
|
||||
}
|
||||
}
|
||||
baselineMap.put(rowkey, floodTypeMap);
|
||||
}
|
||||
logger.info("格式化baseline数据成功,读取IP共:{}", baselineMap.size());
|
||||
} catch (Exception e) {
|
||||
logger.error("读取hbase数据失败", e);
|
||||
}
|
||||
return baselineMap;
|
||||
}
|
||||
|
||||
private static Integer getDefaultValue(Result result, String family, String qualifier) {
|
||||
byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
|
||||
if (value != null){
|
||||
return Bytes.toInt(value);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
private static ArrayList<Integer> getArraylist(Result result, String family, String qualifier) throws IOException {
|
||||
if (containsColumn(result, family, qualifier)) {
|
||||
ArrayWritable w = new ArrayWritable(IntWritable.class);
|
||||
w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)))));
|
||||
return fromWritable(w);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static ArrayList<Integer> fromWritable(ArrayWritable writable) {
|
||||
Writable[] writables = writable.get();
|
||||
ArrayList<Integer> list = new ArrayList<>(writables.length);
|
||||
for (Writable wrt : writables) {
|
||||
list.add(((IntWritable) wrt).get());
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
private static boolean containsColumn(Result result, String family, String qualifier) {
|
||||
return result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -8,14 +8,20 @@ public class IpUtils {
|
||||
* IP定位库工具类
|
||||
*/
|
||||
public static IpLookup ipLookup = new IpLookup.Builder(false)
|
||||
// .loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4.mmdb")
|
||||
// .loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6.mmdb")
|
||||
.loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4.mmdb")
|
||||
.loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6.mmdb")
|
||||
.loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_private_v4.mmdb")
|
||||
// .loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_private_v6.mmdb")
|
||||
.loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_private_v6.mmdb")
|
||||
.build();
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println(ipLookup.countryLookup("61.128.159.186"));
|
||||
System.out.println(ipLookup.countryLookup("49.7.115.37"));
|
||||
|
||||
// String ips = "192.168.50.23,192.168.50.45,192.168.56.9,192.168.56.8,192.168.50.58,192.168.56.7,192.168.56.6,192.168.50.40,192.168.50.19,192.168.50.6,192.168.50.4,192.168.56.17,192.168.50.27,192.168.50.26,192.168.50.18,192.168.56.3,192.168.56.10";
|
||||
// for (String ip:ips.split(",")){
|
||||
// System.out.println(ip+"--"+ipLookup.countryLookup(ip));
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -11,6 +11,9 @@ public class KafkaUtils {
|
||||
private static Properties getKafkaSinkProperty(){
|
||||
Properties propertiesproducer = new Properties();
|
||||
propertiesproducer.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
|
||||
// propertiesproducer.setProperty("security.protocol", "SASL_PLAINTEXT");
|
||||
// propertiesproducer.setProperty("sasl.mechanism", "PLAIN");
|
||||
// propertiesproducer.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";");
|
||||
|
||||
return propertiesproducer;
|
||||
}
|
||||
|
||||
@@ -1,41 +1,93 @@
|
||||
#flink运行环境并行度,其优先级低于算子并行度,如果未设置算子并行度,则使用该数值
|
||||
stream.execution.environment.parallelism=1
|
||||
stream.execution.job.name=dos-detection-job
|
||||
|
||||
#flink任务名,一般不变
|
||||
stream.execution.job.name=DOS-DETECTION-APPLICATION
|
||||
|
||||
#输入kafka并行度大小
|
||||
kafka.input.parallelism=1
|
||||
|
||||
#输入kafka topic名
|
||||
kafka.input.topic.name=DOS-SKETCH-LOG
|
||||
kafka.input.bootstrap.servers=192.168.44.12:9092
|
||||
kafka.input.scan.startup.mode=latest-offset
|
||||
kafka.input.group.id=2108041426
|
||||
#kafka.input.group.id=test
|
||||
|
||||
#输入kafka地址
|
||||
#kafka.input.bootstrap.servers=192.168.44.12:9092
|
||||
kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
|
||||
|
||||
#读取kafka group id
|
||||
kafka.input.group.id=2109061121
|
||||
#kafka.input.group.id=dos-detection-job-210813-1
|
||||
|
||||
#发送kafka metrics并行度大小
|
||||
kafka.output.metric.parallelism=1
|
||||
kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS-LOG
|
||||
kafka.output.event.parallelism=1
|
||||
kafka.output.event.topic.name=DOS-EVENT-LOG
|
||||
kafka.output.bootstrap.servers=192.168.44.12:9092
|
||||
|
||||
hbase.input.parallelism=1
|
||||
hbase.zookeeper.quorum=192.168.44.12:2181
|
||||
#发送kafka metrics topic名
|
||||
#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS-LOG
|
||||
kafka.output.metric.topic.name=test
|
||||
|
||||
#发送kafka event并行度大小
|
||||
kafka.output.event.parallelism=1
|
||||
|
||||
#发送kafka event topic名
|
||||
#kafka.output.event.topic.name=DOS-EVENT-LOG
|
||||
kafka.output.event.topic.name=test
|
||||
|
||||
#sketch日志 topic名以及并行度
|
||||
#kafka.output.sketch.topic.name=FLATTEN-DOS-SKETCH-LOG
|
||||
kafka.output.sketch.topic.name=test
|
||||
kafka.output.sketch.parallelism=1
|
||||
|
||||
#kafka输出地址
|
||||
kafka.output.bootstrap.servers=192.168.44.33:9092
|
||||
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
|
||||
|
||||
#zookeeper地址
|
||||
#hbase.zookeeper.quorum=192.168.44.12:2181
|
||||
hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
|
||||
|
||||
#hbase客户端处理时间
|
||||
hbase.client.operation.timeout=30000
|
||||
hbase.client.scanner.timeout.period=30000
|
||||
|
||||
hbase.baseline.table.name=ddos_traffic_baselines
|
||||
##hbase baseline表名
|
||||
hbase.baseline.table.name=dos:ddos_traffic_baselines
|
||||
|
||||
#读取baseline限制
|
||||
hbase.baseline.total.num=1000000
|
||||
|
||||
#设置聚合并行度,2个key
|
||||
flink.first.agg.parallelism=1
|
||||
flink.second.agg.parallelism=1
|
||||
flink.watermark.max.orderness=1
|
||||
flink.window.max.time=600
|
||||
|
||||
#设置结果判定并行度
|
||||
flink.detection.map.parallelism=1
|
||||
|
||||
#watermark延迟
|
||||
flink.watermark.max.orderness=1
|
||||
|
||||
#计算窗口大小,默认600s
|
||||
flink.window.max.time=10
|
||||
|
||||
#dos event结果中distinct source IP限制
|
||||
source.ip.list.limit=10000
|
||||
|
||||
#基于目的IP的分区数,默认为10000,一般不变
|
||||
destination.ip.partition.num=10000
|
||||
|
||||
data.center.id.num=15
|
||||
|
||||
ip.mmdb.path=D:\\data\\dat_test\\
|
||||
#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/
|
||||
#IP mmdb库路径
|
||||
ip.mmdb.path=D:\\data\\dat\\
|
||||
#ip.mmdb.path=ip.mmdb.path=/home/ceiec/topology/dat/
|
||||
|
||||
#敏感阈值,速率小于此值不报警
|
||||
sensitivity.threshold=100
|
||||
|
||||
#基于baseline判定dos攻击的上下限
|
||||
baseline.sessions.minor.threshold=0.1
|
||||
baseline.sessions.warning.threshold=0.5
|
||||
baseline.sessions.major.threshold=1
|
||||
baseline.sessions.severe.threshold=3
|
||||
baseline.sessions.critical.threshold=8
|
||||
|
||||
#获取baseline周期,默认7天
|
||||
baseline.threshold.schedule.days=7
|
||||
@@ -1,17 +0,0 @@
|
||||
package com.zdjizhi.common;
|
||||
|
||||
import org.apache.flink.table.functions.TableFunction;
|
||||
import org.apache.flink.types.Row;
|
||||
|
||||
public class UdtfTest extends TableFunction<Row> {
|
||||
|
||||
public void eval(Row[] rows) {
|
||||
for (Row row : rows) {
|
||||
collect(row);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user