写入生成类型

This commit is contained in:
yinjiangyi
2021-08-10 10:41:32 +08:00
parent 4e0c576477
commit d788ccc84f
4 changed files with 37 additions and 13 deletions

View File

@@ -23,6 +23,8 @@ public class ApplicationConfig {
public static final String TIME_FORMAT = ConfigUtils.getStringProperty("time.format"); public static final String TIME_FORMAT = ConfigUtils.getStringProperty("time.format");
public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type"); public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type");
public static final String HBASE_BASELINE_GENERATION_TYPE_SUFFIX = ConfigUtils.getStringProperty("hbase.baseline.generation.type.suffix");
public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood"); public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood");
public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood"); public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood");
public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood"); public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood");

View File

@@ -7,6 +7,8 @@ import cn.mesalab.utils.DruidUtils;
import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.RetryUtils; import cn.mesalab.utils.RetryUtils;
import cn.mesalab.utils.SeriesUtils; import cn.mesalab.utils.SeriesUtils;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import org.apache.calcite.avatica.AvaticaClientRuntimeException; import org.apache.calcite.avatica.AvaticaClientRuntimeException;
import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.AvaticaStatement;
@@ -86,9 +88,15 @@ public class BaselineSingleThread extends Thread {
List<Map<String, Object>> ipDruidData = batchDruidData.get(ip).stream() List<Map<String, Object>> ipDruidData = batchDruidData.get(ip).stream()
.filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList()); .filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList());
// baseline生成 // baseline生成
int[] ipBaseline = generateSingleIpBaseline(ip, ipDruidData); Tuple2<int[], Integer> tuple = generateSingleIpBaseline(ip, ipDruidData);
if ((ipBaseline!= null ) && (ip.length()>0)){ if(tuple!=null){
hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); int[] ipBaseline = tuple._1;
int generateType = tuple._2;
if ((ipBaseline!= null ) && (ip.length()>0)){
hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
hbaseUtils.cachedInPut(putList, ip, generateType, attackType,
ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX);
}
} }
} }
} }
@@ -119,24 +127,36 @@ public class BaselineSingleThread extends Thread {
} }
/** /**
* 单ip baseline生成逻辑 *
* @return baseline序列长度为 60/HISTORICAL_GRAD*24 * @return baseline序列长度为 60/HISTORICAL_GRAD*24
*/ */
private int[] generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){ /**
* 单ip baseline生成逻辑
* @param ip
* @param ipDruidData
* @return baseline序列长度为 60/HISTORICAL_GRAD*24;
* baselineGenerationType:
* 1: 高频IP
* 2: 低频有周期IP
* 3其他类型IP, 采用百分位阈值基线
*/
private Tuple2<int[], Integer> generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){
if (ipDruidData.size()==0){ if (ipDruidData.size()==0){
return null; return null;
} }
int baselineGenerationType = 0;
int[] baselineArr = new int[baselinePointNum];
// 时间序列缺失值补0 // 时间序列缺失值补0
List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(ipDruidData); List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(ipDruidData);
int[] baselineArr = new int[baselinePointNum];
List<Integer>series = completSeries.stream().map( List<Integer>series = completSeries.stream().map(
i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
// 判断ip出现频率 // 判断ip出现频率
if(ipDruidData.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){ if(ipDruidData.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){
// 异常值剔除 // 异常值剔除
baselineGenerationType = 1;
double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE); double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE);
double exceptionFillPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXCECPTION_FILL_PERCENTILE); double exceptionFillPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXCECPTION_FILL_PERCENTILE);
for(int i=0; i<series.size(); i++){ for(int i=0; i<series.size(); i++){
@@ -146,25 +166,24 @@ public class BaselineSingleThread extends Thread {
} }
// KF // KF
baselineArr = baselineFunction(series); baselineArr = baselineFunction(series);
// System.out.println("高频IP:" + ip + " origin:" + series + "\n baseline:" + Arrays.toString(baselineArr));
} else { } else {
// 判断周期性 // 判断周期性
if (SeriesUtils.isPeriod(series)){ if (SeriesUtils.isPeriod(series)){
baselineGenerationType = 2;
// KF // KF
baselineArr = baselineFunction(series); baselineArr = baselineFunction(series);
// System.out.println("低频周期IP:" + ip + " origin:" + series + "\n baseline:" + Arrays.toString(baselineArr));
} else { } else {
baselineGenerationType = 3;
// 百分位数 // 百分位数
int ipPercentile = SeriesUtils.percentile( int ipPercentile = SeriesUtils.percentile(
ipDruidData.stream().map(i -> ipDruidData.stream().map(i ->
Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()), Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()),
ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
Arrays.fill(baselineArr, ipPercentile); Arrays.fill(baselineArr, ipPercentile);
// System.out.println("其他IP:" + ip + " origin:" + series + "\n baseline:" + Arrays.toString(baselineArr));
} }
} }
return baselineArr; return new Tuple2<>(baselineArr, baselineGenerationType);
} }
/** /**

View File

@@ -1 +1 @@
package cn.mesalab.utils; package cn.mesalab.utils;

View File

@@ -23,7 +23,7 @@ read.druid.min.time=1625414400000
#07-08 #07-08
read.druid.max.time=1625673600000 read.druid.max.time=1625673600000
#字段映射 #Druid字段映射
druid.attacktype.tcpsynflood=TCP SYN Flood druid.attacktype.tcpsynflood=TCP SYN Flood
druid.attacktype.udpflood=UDP Flood druid.attacktype.udpflood=UDP Flood
druid.attacktype.icmpflood=ICMP Flood druid.attacktype.icmpflood=ICMP Flood
@@ -34,6 +34,9 @@ druid.columnname.recvtime=__time
druid.columnname.partition.num=partition_num druid.columnname.partition.num=partition_num
baseline.metric.type=session_rate baseline.metric.type=session_rate
#Hbase字段映射
hbase.baseline.generation.type.suffix=baseline_type
#数据情况 #数据情况
#读取历史N天数据最小值为3天需要判断周期性 #读取历史N天数据最小值为3天需要判断周期性
read.historical.days=3 read.historical.days=3