1 Commits

Author SHA1 Message Date
尹姜谊
df43bb9e54 修改rowkey,增加vsysid信息 2022-09-21 09:48:50 +08:00
5 changed files with 54 additions and 44 deletions

View File

@@ -32,6 +32,7 @@ public class ApplicationConfig {
public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood"); public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood");
public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification"); public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification");
public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.serverip"); public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.serverip");
public static final String DRUID_VSYSID_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.vsysid");
public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.attacktype"); public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.attacktype");
public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.recvtime"); public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.recvtime");
public static final String DRUID_COLUMNNAME_PARTITION_NUM = ConfigUtils.getStringProperty("druid.columnname.partition.num"); public static final String DRUID_COLUMNNAME_PARTITION_NUM = ConfigUtils.getStringProperty("druid.columnname.partition.num");

View File

@@ -48,21 +48,23 @@ public class DruidData {
} }
public static Map<String, List<Map<String, Object>>> selectAll(List<Map<String, Object>> result) { public static Map<String, List<Map<String, Object>>> selectAll(List<Map<String, Object>> result) {
Map<String, List<Map<String, Object>>> allIpDataList = new HashMap<>(); Map<String, List<Map<String, Object>>> allKeyDataList = new HashMap<>();
ArrayList<String> ipList = new ArrayList<>(); ArrayList<String> keyList = new ArrayList<>();
for (Map<String, Object> rowData : result) { for (Map<String, Object> rowData : result) {
String ip = (String) rowData.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); String ip = (String) rowData.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
if (!ipList.contains(ip)) { String vsysId = Long.toString((Long) rowData.get(ApplicationConfig.DRUID_VSYSID_COLUMN_NAME));
ipList.add(ip); String key = ip + "-" + vsysId;
List<Map<String, Object>> ipData = new ArrayList<>(); if (!keyList.contains(key)) {
allIpDataList.put(ip, ipData); keyList.add(key);
List<Map<String, Object>> keyData = new ArrayList<>();
allKeyDataList.put(key, keyData);
} }
rowData.remove(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); rowData.remove(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
allIpDataList.get(ip).add(rowData); allKeyDataList.get(key).add(rowData);
} }
return allIpDataList; return allKeyDataList;
} }
@@ -158,9 +160,10 @@ public class DruidData {
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " < MILLIS_TO_TIMESTAMP(" + endTime + ")"; + " < MILLIS_TO_TIMESTAMP(" + endTime + ")";
return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME String sql = "SELECT " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + ", " + ApplicationConfig.DRUID_VSYSID_COLUMN_NAME
+ ", AVG("+ ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE + ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ ", AVG(" + ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE + " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + " IN " + attackList + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + " IN " + attackList
@@ -168,8 +171,11 @@ public class DruidData {
+ " AND " + timeFilter + " AND " + timeFilter
+ " AND " + partitionFilter + " AND " + partitionFilter
+ " GROUP BY (" + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + " GROUP BY (" + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ ", " + ApplicationConfig.DRUID_VSYSID_COLUMN_NAME
+ ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ")"; + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ")";
System.out.println(sql);
return sql;
} }
/** /**

View File

@@ -7,7 +7,6 @@ import cn.mesalab.utils.DruidUtils;
import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.RetryUtils; import cn.mesalab.utils.RetryUtils;
import cn.mesalab.utils.SeriesUtils; import cn.mesalab.utils.SeriesUtils;
import com.google.common.collect.Lists;
import io.vavr.Tuple3; import io.vavr.Tuple3;
import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.AvaticaStatement;
@@ -83,35 +82,38 @@ public class BaselineSingleThread extends Thread {
batchDruidData = new HashMap<>(); batchDruidData = new HashMap<>();
} }
LOG.info("完成数据处理获取Server IP" + batchDruidData.size() + LOG.info("完成数据处理获取Server IP + vsys_id" + batchDruidData.size() +
" 运行时间:" + (System.currentTimeMillis() - start)); " 运行时间:" + (System.currentTimeMillis() - start));
// 基线生成 // 基线生成
List<Put> putList = new ArrayList<>(); List<Put> putList = new ArrayList<>();
for(String attackType: attackTypeList){ for(String attackType: attackTypeList){
for(String ip: batchDruidData.keySet()){ for(String key: batchDruidData.keySet()){
// 筛选指定ip指定攻击类型的数据 // 筛选指定key(ip+vsys_id)指定攻击类型的数据
List<Map<String, Object>> ipDruidData = batchDruidData.get(ip).stream() List<Map<String, Object>> keyDruidData = batchDruidData.get(key).stream()
.filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList()); .filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList());
// baseline生成 // baseline生成
Tuple3<int[], Integer, Integer> tuple = generateSingleIpBaseline(ip, ipDruidData); Tuple3<int[], Integer, Integer> tuple = generateSingleBaseline(key, keyDruidData);
if(tuple!=null){ if(tuple!=null){
int[] ipBaseline = tuple._1; int[] baseline = tuple._1;
int generateType = tuple._2; int generateType = tuple._2;
int zeroReplaceValue = tuple._3; int zeroReplaceValue = tuple._3;
if ((BASELINE_SAVE_LEVEL >= generateType) && (ipBaseline!= null ) && (ip.length()>0)){
hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); List<String> keys = Arrays.asList(key.split("-"));
hbaseUtils.cachedInPut(putList, ip, generateType, attackType, keys.remove("");
if ((BASELINE_SAVE_LEVEL >= generateType) && (baseline!= null ) && (keys.size() == 2)){
hbaseUtils.cachedInPut(putList, key, baseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
hbaseUtils.cachedInPut(putList, key, generateType, attackType,
ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX); ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX);
hbaseUtils.cachedInPut(putList, ip, zeroReplaceValue, attackType, hbaseUtils.cachedInPut(putList, key, zeroReplaceValue, attackType,
ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX); ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX);
} }
} }
} }
} }
try { try {
LOG.info("MONITOR-IP频率分段统计" + frequencyBinCounter); LOG.info("MONITOR-IP-vsysID频率分段统计:" + frequencyBinCounter);
LOG.info("MONITOR-生成类别统计:" + generateTypeCounter); LOG.info("MONITOR-生成类别统计:" + generateTypeCounter);
LOG.info("MONITOR-无baseline生成的个数" + discardBaselineCounter + " 其中包括IP共" + discardIpList.size()); LOG.info("MONITOR-无baseline生成的个数" + discardBaselineCounter + " 其中包括IP共" + discardIpList.size());
hbaseTable.put(putList); hbaseTable.put(putList);
@@ -151,36 +153,36 @@ public class BaselineSingleThread extends Thread {
/** /**
* 单ip baseline生成逻辑 * 单ip baseline生成逻辑
* @param ip * @param key
* @param ipDruidData * @param keyDruidData
* @return baseline序列长度为 60/HISTORICAL_GRAD*24; * @return baseline序列长度为 60/HISTORICAL_GRAD*24;
* baselineGenerationType: * baselineGenerationType:
* 1: 高频IP * 1: 高频key
* 2: 低频有周期IP * 2: 低频有周期key
* 3其他类型IP, 采用百分位阈值基线 * 3其他类型key, 采用百分位阈值基线
*/ */
private Tuple3<int[], Integer, Integer> generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){ private Tuple3<int[], Integer, Integer> generateSingleBaseline(String key, List<Map<String, Object>> keyDruidData){
// 无数据ip-攻击类型)不计算 // 无数据不计算
if (ipDruidData.size()==0){ if (keyDruidData.size()==0){
updateDiscardCounter(ip); updateDiscardCounter(key);
return null; return null;
} }
List<Integer> originSeries = ipDruidData.stream().map(i -> List<Integer> originSeries = keyDruidData.stream().map(i ->
Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
List<Integer> originNonZeroSeries = originSeries.stream().filter(i->i>0).collect(Collectors.toList()); List<Integer> originNonZeroSeries = originSeries.stream().filter(i->i>0).collect(Collectors.toList());
// 全零ip-攻击类型)不计算 // 全零ip-攻击类型)不计算
if(originNonZeroSeries.size()==0){ if(originNonZeroSeries.size()==0){
updateDiscardCounter(ip); updateDiscardCounter(key);
return null; return null;
} }
int ipPercentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); int percentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
int baselineGenerationType; int baselineGenerationType;
int[] baselineArr = new int[baselinePointNum]; int[] baselineArr = new int[baselinePointNum];
// 时间序列缺失值补0 // 时间序列缺失值补0
List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(ipDruidData); List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(keyDruidData);
List<Integer>series = completSeries.stream().map( List<Integer>series = completSeries.stream().map(
i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
@@ -190,7 +192,7 @@ public class BaselineSingleThread extends Thread {
// 异常值剔除 // 异常值剔除
double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE); double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE);
double exceptionFillPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXCECPTION_FILL_PERCENTILE); double exceptionFillPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXCECPTION_FILL_PERCENTILE);
LOG.debug(ip + ": series-" + series); LOG.debug(key + ": series-" + series);
for(int i=0; i<series.size(); i++){ for(int i=0; i<series.size(); i++){
if(series.get(i) > exceptionPercentile){ if(series.get(i) > exceptionPercentile){
series.set(i, (int) exceptionFillPercentile); series.set(i, (int) exceptionFillPercentile);
@@ -203,7 +205,7 @@ public class BaselineSingleThread extends Thread {
double p50 = SeriesUtils.percentile(series, 0.50); double p50 = SeriesUtils.percentile(series, 0.50);
// 无周期性 // 无周期性
float ipFrequency = ipDruidData.size() / (float) completSeries.size(); float ipFrequency = keyDruidData.size() / (float) completSeries.size();
updateLogFrequencyCounter(ipFrequency); updateLogFrequencyCounter(ipFrequency);
// 频率判断 // 频率判断
if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD ){ if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD ){
@@ -220,7 +222,7 @@ public class BaselineSingleThread extends Thread {
// 计算默认值-非零数据的百分位数 // 计算默认值-非零数据的百分位数
int defaultValue = SeriesUtils.percentile(originNonZeroSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); int defaultValue = SeriesUtils.percentile(originNonZeroSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
if(defaultValue == 0){ if(defaultValue == 0){
LOG.error(ip + "-" + "baseline default value is 0 !"); LOG.error(key + "-" + "baseline default value is 0 !");
} }
return new Tuple3<>(baselineArr, baselineGenerationType, defaultValue); return new Tuple3<>(baselineArr, baselineGenerationType, defaultValue);

View File

@@ -1 +1 @@
package cn.mesalab.utils; package cn.mesalab.utils;

View File

@@ -19,9 +19,9 @@ hbase.zookeeper.client.port=2181
#读取druid时间范围方式 #读取druid时间范围方式
# 0读取默认范围天数read.historical.days # 0读取默认范围天数read.historical.days
# 1指定时间范围 # 1指定时间范围
read.druid.time.limit.type=0 read.druid.time.limit.type=1
read.druid.min.time=1627747200000 read.druid.min.time=1663430400000
read.druid.max.time=1630425600000 read.druid.max.time=1663603200000
#Druid字段映射 #Druid字段映射
druid.attacktype.tcpsynflood=TCP SYN Flood druid.attacktype.tcpsynflood=TCP SYN Flood
@@ -29,6 +29,7 @@ druid.attacktype.udpflood=UDP Flood
druid.attacktype.icmpflood=ICMP Flood druid.attacktype.icmpflood=ICMP Flood
druid.attacktype.dnsamplification=DNS Flood druid.attacktype.dnsamplification=DNS Flood
druid.columnname.serverip=destination_ip druid.columnname.serverip=destination_ip
druid.columnname.vsysid=vsys_id
druid.columnname.attacktype=attack_type druid.columnname.attacktype=attack_type
druid.columnname.recvtime=__time druid.columnname.recvtime=__time
druid.columnname.partition.num=partition_num druid.columnname.partition.num=partition_num
@@ -76,7 +77,7 @@ monitor.frequency.bin.num=100
########################################## ##########################################
################ 并发参数 ################# ################ 并发参数 #################
########################################## ##########################################
all.partition.num=100 all.partition.num=10
core.pool.size=10 core.pool.size=10
max.pool.size=10 max.pool.size=10
#druid分区字段partition_num的最大值为9999 #druid分区字段partition_num的最大值为9999