Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6096ff2249 | ||
|
|
df43bb9e54 |
@@ -31,7 +31,9 @@ public class ApplicationConfig {
|
||||
public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood");
|
||||
public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood");
|
||||
public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification");
|
||||
public static final String DRUID_ATTACKTYPE_NTP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.ntpflood");
|
||||
public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.serverip");
|
||||
public static final String DRUID_VSYSID_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.vsysid");
|
||||
public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.attacktype");
|
||||
public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.recvtime");
|
||||
public static final String DRUID_COLUMNNAME_PARTITION_NUM = ConfigUtils.getStringProperty("druid.columnname.partition.num");
|
||||
|
||||
@@ -48,21 +48,23 @@ public class DruidData {
|
||||
}
|
||||
|
||||
public static Map<String, List<Map<String, Object>>> selectAll(List<Map<String, Object>> result) {
|
||||
Map<String, List<Map<String, Object>>> allIpDataList = new HashMap<>();
|
||||
ArrayList<String> ipList = new ArrayList<>();
|
||||
Map<String, List<Map<String, Object>>> allKeyDataList = new HashMap<>();
|
||||
ArrayList<String> keyList = new ArrayList<>();
|
||||
|
||||
for (Map<String, Object> rowData : result) {
|
||||
String ip = (String) rowData.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
|
||||
if (!ipList.contains(ip)) {
|
||||
ipList.add(ip);
|
||||
List<Map<String, Object>> ipData = new ArrayList<>();
|
||||
allIpDataList.put(ip, ipData);
|
||||
String vsysId = Long.toString((Long) rowData.get(ApplicationConfig.DRUID_VSYSID_COLUMN_NAME));
|
||||
String key = ip + "-" + vsysId;
|
||||
if (!keyList.contains(key)) {
|
||||
keyList.add(key);
|
||||
List<Map<String, Object>> keyData = new ArrayList<>();
|
||||
allKeyDataList.put(key, keyData);
|
||||
}
|
||||
rowData.remove(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
|
||||
allIpDataList.get(ip).add(rowData);
|
||||
allKeyDataList.get(key).add(rowData);
|
||||
}
|
||||
|
||||
return allIpDataList;
|
||||
return allKeyDataList;
|
||||
}
|
||||
|
||||
|
||||
@@ -158,9 +160,10 @@ public class DruidData {
|
||||
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
|
||||
+ " < MILLIS_TO_TIMESTAMP(" + endTime + ")";
|
||||
|
||||
return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
|
||||
+ ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
|
||||
+ ", AVG("+ ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE
|
||||
String sql = "SELECT " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
|
||||
+ ", " + ApplicationConfig.DRUID_VSYSID_COLUMN_NAME
|
||||
+ ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
|
||||
+ ", AVG(" + ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE
|
||||
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
|
||||
+ " FROM " + ApplicationConfig.DRUID_TABLE
|
||||
+ " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + " IN " + attackList
|
||||
@@ -168,8 +171,11 @@ public class DruidData {
|
||||
+ " AND " + timeFilter
|
||||
+ " AND " + partitionFilter
|
||||
+ " GROUP BY (" + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
|
||||
+ ", " + ApplicationConfig.DRUID_VSYSID_COLUMN_NAME
|
||||
+ ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
|
||||
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ")";
|
||||
System.out.println(sql);
|
||||
return sql;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -23,7 +23,8 @@ public class BaselineGeneration {
|
||||
ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD,
|
||||
ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD,
|
||||
ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD,
|
||||
ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL
|
||||
ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL,
|
||||
ApplicationConfig.DRUID_ATTACKTYPE_NTP_FLOOD
|
||||
);
|
||||
private static final Integer BASELINE_POINT_NUM =
|
||||
ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
|
||||
|
||||
@@ -7,7 +7,6 @@ import cn.mesalab.utils.DruidUtils;
|
||||
import cn.mesalab.utils.HbaseUtils;
|
||||
import cn.mesalab.utils.RetryUtils;
|
||||
import cn.mesalab.utils.SeriesUtils;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.vavr.Tuple3;
|
||||
import org.apache.calcite.avatica.AvaticaConnection;
|
||||
import org.apache.calcite.avatica.AvaticaStatement;
|
||||
@@ -83,35 +82,38 @@ public class BaselineSingleThread extends Thread {
|
||||
batchDruidData = new HashMap<>();
|
||||
}
|
||||
|
||||
LOG.info("完成数据处理:获取Server IP:" + batchDruidData.size() +
|
||||
LOG.info("完成数据处理:获取Server IP + vsys_id:" + batchDruidData.size() +
|
||||
" 运行时间:" + (System.currentTimeMillis() - start));
|
||||
|
||||
|
||||
// 基线生成
|
||||
List<Put> putList = new ArrayList<>();
|
||||
for(String attackType: attackTypeList){
|
||||
for(String ip: batchDruidData.keySet()){
|
||||
// 筛选指定ip指定攻击类型的数据
|
||||
List<Map<String, Object>> ipDruidData = batchDruidData.get(ip).stream()
|
||||
for(String key: batchDruidData.keySet()){
|
||||
// 筛选指定key(ip+vsys_id)指定攻击类型的数据
|
||||
List<Map<String, Object>> keyDruidData = batchDruidData.get(key).stream()
|
||||
.filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList());
|
||||
// baseline生成
|
||||
Tuple3<int[], Integer, Integer> tuple = generateSingleIpBaseline(ip, ipDruidData);
|
||||
Tuple3<int[], Integer, Integer> tuple = generateSingleBaseline(key, keyDruidData);
|
||||
if(tuple!=null){
|
||||
int[] ipBaseline = tuple._1;
|
||||
int[] baseline = tuple._1;
|
||||
int generateType = tuple._2;
|
||||
int zeroReplaceValue = tuple._3;
|
||||
if ((BASELINE_SAVE_LEVEL >= generateType) && (ipBaseline!= null ) && (ip.length()>0)){
|
||||
hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
|
||||
hbaseUtils.cachedInPut(putList, ip, generateType, attackType,
|
||||
|
||||
List<String> keys = Arrays.asList(key.split("-"));
|
||||
keys.remove("");
|
||||
if ((BASELINE_SAVE_LEVEL >= generateType) && (baseline!= null ) && (keys.size() == 2)){
|
||||
hbaseUtils.cachedInPut(putList, key, baseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
|
||||
hbaseUtils.cachedInPut(putList, key, generateType, attackType,
|
||||
ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX);
|
||||
hbaseUtils.cachedInPut(putList, ip, zeroReplaceValue, attackType,
|
||||
hbaseUtils.cachedInPut(putList, key, zeroReplaceValue, attackType,
|
||||
ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
LOG.info("MONITOR-IP频率分段统计:" + frequencyBinCounter);
|
||||
LOG.info("MONITOR-IP-vsysID频率分段统计:" + frequencyBinCounter);
|
||||
LOG.info("MONITOR-生成类别统计:" + generateTypeCounter);
|
||||
LOG.info("MONITOR-无baseline生成的个数:" + discardBaselineCounter + " 其中包括IP共:" + discardIpList.size());
|
||||
hbaseTable.put(putList);
|
||||
@@ -151,36 +153,36 @@ public class BaselineSingleThread extends Thread {
|
||||
|
||||
/**
|
||||
* 单ip baseline生成逻辑
|
||||
* @param ip
|
||||
* @param ipDruidData
|
||||
* @param key
|
||||
* @param keyDruidData
|
||||
* @return baseline序列,长度为 60/HISTORICAL_GRAD*24;
|
||||
* baselineGenerationType:
|
||||
* 1: 高频IP
|
||||
* 2: 低频有周期IP
|
||||
* 3:其他类型IP, 采用百分位阈值基线
|
||||
* 1: 高频key
|
||||
* 2: 低频有周期key
|
||||
* 3:其他类型key, 采用百分位阈值基线
|
||||
*/
|
||||
private Tuple3<int[], Integer, Integer> generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){
|
||||
// 无数据(ip-攻击类型)不计算
|
||||
if (ipDruidData.size()==0){
|
||||
updateDiscardCounter(ip);
|
||||
private Tuple3<int[], Integer, Integer> generateSingleBaseline(String key, List<Map<String, Object>> keyDruidData){
|
||||
// 无数据不计算
|
||||
if (keyDruidData.size()==0){
|
||||
updateDiscardCounter(key);
|
||||
return null;
|
||||
}
|
||||
List<Integer> originSeries = ipDruidData.stream().map(i ->
|
||||
List<Integer> originSeries = keyDruidData.stream().map(i ->
|
||||
Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
|
||||
List<Integer> originNonZeroSeries = originSeries.stream().filter(i->i>0).collect(Collectors.toList());
|
||||
|
||||
// 全零(ip-攻击类型)不计算
|
||||
if(originNonZeroSeries.size()==0){
|
||||
updateDiscardCounter(ip);
|
||||
updateDiscardCounter(key);
|
||||
return null;
|
||||
}
|
||||
|
||||
int ipPercentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
|
||||
int percentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
|
||||
int baselineGenerationType;
|
||||
int[] baselineArr = new int[baselinePointNum];
|
||||
|
||||
// 时间序列缺失值补0
|
||||
List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(ipDruidData);
|
||||
List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(keyDruidData);
|
||||
List<Integer>series = completSeries.stream().map(
|
||||
i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
|
||||
|
||||
@@ -190,7 +192,7 @@ public class BaselineSingleThread extends Thread {
|
||||
// 异常值剔除
|
||||
double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE);
|
||||
double exceptionFillPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXCECPTION_FILL_PERCENTILE);
|
||||
LOG.debug(ip + ": series-" + series);
|
||||
LOG.debug(key + ": series-" + series);
|
||||
for(int i=0; i<series.size(); i++){
|
||||
if(series.get(i) > exceptionPercentile){
|
||||
series.set(i, (int) exceptionFillPercentile);
|
||||
@@ -203,7 +205,7 @@ public class BaselineSingleThread extends Thread {
|
||||
double p50 = SeriesUtils.percentile(series, 0.50);
|
||||
|
||||
// 无周期性
|
||||
float ipFrequency = ipDruidData.size() / (float) completSeries.size();
|
||||
float ipFrequency = keyDruidData.size() / (float) completSeries.size();
|
||||
updateLogFrequencyCounter(ipFrequency);
|
||||
// 频率判断
|
||||
if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD ){
|
||||
@@ -220,7 +222,7 @@ public class BaselineSingleThread extends Thread {
|
||||
// 计算默认值-非零数据的百分位数
|
||||
int defaultValue = SeriesUtils.percentile(originNonZeroSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
|
||||
if(defaultValue == 0){
|
||||
LOG.error(ip + "-" + "baseline default value is 0 !");
|
||||
LOG.error(key + "-" + "baseline default value is 0 !");
|
||||
}
|
||||
|
||||
return new Tuple3<>(baselineArr, baselineGenerationType, defaultValue);
|
||||
|
||||
@@ -1 +1 @@
|
||||
package cn.mesalab.utils;
|
||||
package cn.mesalab.utils;
|
||||
@@ -1,15 +1,12 @@
|
||||
############## 数据库配置 ###############
|
||||
##########################################
|
||||
#Druid配置
|
||||
#druid.url=jdbc:avatica:remote:url=http://10.111.200.180:8089/druid/v2/sql/avatica/
|
||||
# test
|
||||
druid.url=jdbc:avatica:remote:url=http://192.168.44.12:8082/druid/v2/sql/avatica/
|
||||
druid.url=jdbc:avatica:remote:url=http://192.168.44.12:8089/druid/v2/sql/avatica/
|
||||
druid.driver=org.apache.calcite.avatica.remote.Driver
|
||||
druid.table=traffic_top_destination_ip_metrics_log
|
||||
druid.table=dos_sketch_top_server_ip
|
||||
|
||||
#HBase配置
|
||||
hbase.table=dos:ddos_traffic_baselines
|
||||
#hbase.zookeeper.quorum=10.111.200.165,10.111.200.166,10.111.200.167,10.111.200.168,10.111.200.169
|
||||
hbase.zookeeper.quorum=192.168.44.12
|
||||
hbase.zookeeper.client.port=2181
|
||||
|
||||
@@ -19,16 +16,18 @@ hbase.zookeeper.client.port=2181
|
||||
#读取druid时间范围方式,
|
||||
# 0:读取默认范围天数read.historical.days;
|
||||
# 1:指定时间范围
|
||||
read.druid.time.limit.type=0
|
||||
read.druid.min.time=1627747200000
|
||||
read.druid.max.time=1630425600000
|
||||
read.druid.time.limit.type=1
|
||||
read.druid.min.time=1711522800000
|
||||
read.druid.max.time=1711526400000
|
||||
|
||||
#Druid字段映射
|
||||
druid.attacktype.tcpsynflood=TCP SYN Flood
|
||||
druid.attacktype.udpflood=UDP Flood
|
||||
druid.attacktype.icmpflood=ICMP Flood
|
||||
druid.attacktype.dnsamplification=DNS Flood
|
||||
druid.attacktype.ntpflood=NTP Flood
|
||||
druid.columnname.serverip=destination_ip
|
||||
druid.columnname.vsysid=vsys_id
|
||||
druid.columnname.attacktype=attack_type
|
||||
druid.columnname.recvtime=__time
|
||||
druid.columnname.partition.num=partition_num
|
||||
@@ -42,7 +41,7 @@ hbase.baseline.zero.replace.value.suffix=default_value
|
||||
|
||||
#数据情况
|
||||
#读取历史N天数据,最小值为3天(需要判断周期性)
|
||||
read.historical.days=3
|
||||
read.historical.days=30
|
||||
#历史数据汇聚粒度为10分钟
|
||||
historical.grad=10
|
||||
# 数据库Time格式
|
||||
@@ -76,9 +75,9 @@ monitor.frequency.bin.num=100
|
||||
##########################################
|
||||
################ 并发参数 #################
|
||||
##########################################
|
||||
all.partition.num=100
|
||||
core.pool.size=10
|
||||
max.pool.size=10
|
||||
all.partition.num=1
|
||||
core.pool.size=1
|
||||
max.pool.size=1
|
||||
#druid分区字段partition_num的最大值为9999
|
||||
druid.statement.query.timeout=36000
|
||||
druid.partition.num.max=10000
|
||||
|
||||
Reference in New Issue
Block a user