增加字段*_zero_replace_value

This commit is contained in:
yinjiangyi
2021-08-17 14:44:20 +08:00
parent e9d045b04e
commit b5645b72ed
4 changed files with 26 additions and 18 deletions

View File

@@ -24,6 +24,7 @@ public class ApplicationConfig {
public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type"); public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type");
public static final String HBASE_BASELINE_GENERATION_TYPE_SUFFIX = ConfigUtils.getStringProperty("hbase.baseline.generation.type.suffix"); public static final String HBASE_BASELINE_GENERATION_TYPE_SUFFIX = ConfigUtils.getStringProperty("hbase.baseline.generation.type.suffix");
public static final String HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX = ConfigUtils.getStringProperty("hbase.baseline.zero.replace.value.suffix");
public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood"); public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood");
public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood"); public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood");
@@ -40,7 +41,7 @@ public class ApplicationConfig {
public static final Float BASELINE_HISTORICAL_FREQUENCY_THREAD = ConfigUtils.getFloatProperty("baseline.historical.frequency.thread"); public static final Float BASELINE_HISTORICAL_FREQUENCY_THREAD = ConfigUtils.getFloatProperty("baseline.historical.frequency.thread");
// 异常值判断分位数 // 异常值判断分位数
public static final Float BASELINE_EXECEPTION_PERCENTILE = ConfigUtils.getFloatProperty("baseline.exception.percentile"); public static final Float BASELINE_EXECEPTION_PERCENTILE = ConfigUtils.getFloatProperty("baseline.exception.percentile");
// 异常值替换百分位 // 异常值百分位
public static final Float BASELINE_EXCECPTION_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.exception.fill.percentile"); public static final Float BASELINE_EXCECPTION_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.exception.fill.percentile");
public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function"); public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function");
public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days"); public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days");

View File

@@ -7,12 +7,10 @@ import cn.mesalab.utils.DruidUtils;
import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.RetryUtils; import cn.mesalab.utils.RetryUtils;
import cn.mesalab.utils.SeriesUtils; import cn.mesalab.utils.SeriesUtils;
import io.vavr.Tuple;
import io.vavr.Tuple2; import io.vavr.Tuple2;
import org.apache.calcite.avatica.AvaticaClientRuntimeException; import io.vavr.Tuple3;
import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.commons.math3.stat.StatUtils;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -88,14 +86,19 @@ public class BaselineSingleThread extends Thread {
List<Map<String, Object>> ipDruidData = batchDruidData.get(ip).stream() List<Map<String, Object>> ipDruidData = batchDruidData.get(ip).stream()
.filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList()); .filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList());
// baseline生成 // baseline生成
Tuple2<int[], Integer> tuple = generateSingleIpBaseline(ip, ipDruidData); Tuple3<int[], Integer, Integer> tuple = generateSingleIpBaseline(ip, ipDruidData);
if(tuple!=null){ if(tuple!=null){
int[] ipBaseline = tuple._1; int[] ipBaseline = tuple._1;
int generateType = tuple._2; int generateType = tuple._2;
int zeroReplaceValue = tuple._3;
if ((ipBaseline!= null ) && (ip.length()>0)){ if ((ipBaseline!= null ) && (ip.length()>0)){
hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
hbaseUtils.cachedInPut(putList, ip, generateType, attackType, hbaseUtils.cachedInPut(putList, ip, generateType, attackType,
ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX); ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX);
hbaseUtils.cachedInPut(putList, ip, generateType, attackType,
ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX);
hbaseUtils.cachedInPut(putList, ip, zeroReplaceValue, attackType,
ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX);
} }
} }
} }
@@ -136,11 +139,17 @@ public class BaselineSingleThread extends Thread {
* 2: 低频有周期IP * 2: 低频有周期IP
* 3其他类型IP, 采用百分位阈值基线 * 3其他类型IP, 采用百分位阈值基线
*/ */
private Tuple2<int[], Integer> generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){ private Tuple3<int[], Integer, Integer> generateSingleIpBaseline(String ip, List<Map<String, Object>> ipDruidData){
if (ipDruidData.size()==0){ if (ipDruidData.size()==0){
return null; return null;
} }
List<Integer> originSeries = ipDruidData.stream().map(i ->
Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
if(Collections.max(originSeries)==0){
return null;
}
int baselineGenerationType = 0; int baselineGenerationType = 0;
int[] baselineArr = new int[baselinePointNum]; int[] baselineArr = new int[baselinePointNum];
@@ -148,9 +157,11 @@ public class BaselineSingleThread extends Thread {
List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(ipDruidData); List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(ipDruidData);
List<Integer>series = completSeries.stream().map( List<Integer>series = completSeries.stream().map(
i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
int ipPercentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
// 判断ip出现频率 // 判断ip出现频率
if(ipDruidData.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){ float ipFrequency = ipDruidData.size() / (float) completSeries.size();
if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){
// 异常值剔除 // 异常值剔除
baselineGenerationType = 1; baselineGenerationType = 1;
double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE); double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE);
@@ -162,27 +173,22 @@ public class BaselineSingleThread extends Thread {
} }
// KF // KF
baselineArr = baselineFunction(series); baselineArr = baselineFunction(series);
System.out.println("type-01" + ip + " " + Arrays.toString(baselineArr)); // System.out.println("type-01" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr));
} else { } else {
// 判断周期性 // 判断周期性
if (SeriesUtils.isPeriod(series)){ if (SeriesUtils.isPeriod(series)){
baselineGenerationType = 2; baselineGenerationType = 2;
// KF // KF
baselineArr = baselineFunction(series); baselineArr = baselineFunction(series);
System.out.println("type-02" + ip + " " + Arrays.toString(baselineArr)); // System.out.println("type-02" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr));
} else { } else {
baselineGenerationType = 3; baselineGenerationType = 3;
// 百分位数
int ipPercentile = SeriesUtils.percentile(
ipDruidData.stream().map(i ->
Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()),
ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
Arrays.fill(baselineArr, ipPercentile); Arrays.fill(baselineArr, ipPercentile);
// System.out.println("type-03" + ip + " " + Arrays.toString(baselineArr)); // System.out.println("type-03" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr));
} }
} }
return new Tuple2<>(baselineArr, baselineGenerationType); return new Tuple3<>(baselineArr, baselineGenerationType, ipPercentile);
} }
/** /**

View File

@@ -1 +1 @@
package cn.mesalab.utils; package cn.mesalab.utils;

View File

@@ -36,6 +36,7 @@ baseline.metric.type=session_rate
#Hbase字段映射 #Hbase字段映射
hbase.baseline.generation.type.suffix=baseline_type hbase.baseline.generation.type.suffix=baseline_type
hbase.baseline.zero.replace.value.suffix=zero_replace_value
#数据情况 #数据情况
#读取历史N天数据最小值为3天需要判断周期性 #读取历史N天数据最小值为3天需要判断周期性
@@ -51,7 +52,7 @@ time.format=yyyy-MM-dd HH:mm:ss
baseline.range.days=1 baseline.range.days=1
baseline.function=KalmanFilter baseline.function=KalmanFilter
baseline.period.correlative.threshold=0.5 baseline.period.correlative.threshold=0.5
baseline.historical.frequency.thread=0.1 baseline.historical.frequency.thread=0.2
baseline.exception.percentile=0.99 baseline.exception.percentile=0.99
baseline.exception.fill.percentile=0.99 baseline.exception.fill.percentile=0.99
baseline.rational.percentile=0.95 baseline.rational.percentile=0.95