@@ -28,7 +28,7 @@ public class BaselineSingleThread extends Thread {
private final Table hbaseTable ;
private final List < String > attackTypeList ;
private final Integer historical PointNum;
private final Integer baseline PointNum;
private final Map < String , List < Map < String , Object > > > batchDruidData ;
private final CountDownLatch countDownLatch ;
@@ -41,7 +41,7 @@ public class BaselineSingleThread extends Thread {
) {
this . hbaseTable = hbaseTable ;
this . attackTypeList = attackTypeList ;
this . historical PointNum = baselinePointNum ;
this . baseline PointNum = baselinePointNum ;
this . batchDruidData = batchDruidData ;
this . countDownLatch = countDownLatch ;
}
@@ -55,7 +55,7 @@ public class BaselineSingleThread extends Thread {
List < Map < String , Object > > ipDruidData = batchDruidData . get ( ip ) . stream ( )
. filter ( i - > i . get ( ApplicationConfig . DRUID_ATTACKTYPE_COLUMN_NAME ) . equals ( attackType ) ) . collect ( Collectors . toList ( ) ) ;
// baseline生成
int [ ] ipBaseline = generateSingleIpBaseline ( ipDruidData ) ;
int [ ] ipBaseline = generateSingleIpBaseline ( ip , ipDruidData) ;
if ( ipBaseline ! = null ) {
HbaseUtils . cachedInPut ( putList , ip , ipBaseline , attackType , ApplicationConfig . BASELINE_METRIC_TYPE ) ;
}
@@ -75,7 +75,7 @@ public class BaselineSingleThread extends Thread {
* 单ip baseline生成逻辑
* @return baseline序列, 长度为 60/HISTORICAL_GRAD*24
*/
private int [ ] generateSingleIpBaseline ( List < Map < String , Object > > ipDruidData ) {
private int [ ] generateSingleIpBaseline ( String ip , List< Map < String , Object > > ipDruidData ) {
if ( ipDruidData . size ( ) = = 0 ) {
return null ;
}
@@ -83,22 +83,25 @@ public class BaselineSingleThread extends Thread {
// 时间序列缺失值补0
List < Map < String , Object > > completSeries = SeriesUtils . complementSeries ( ipDruidData ) ;
int [ ] baselineArr = new int [ historical PointNum] ;
int [ ] baselineArr = new int [ baseline PointNum] ;
List < Integer > series = completSeries . stream ( ) . map (
i - > Integer . valueOf ( i . get ( ApplicationConfig . BASELINE_METRIC_TYPE ) . toString ( ) ) ) . collect ( Collectors . toList ( ) ) ;
// 判断ip出现频率
if ( ipDruidData . size ( ) / ( float ) completSeries . size ( ) > ApplicationConfig . BASELINE_HISTORICAL_RATIO ) {
// 高频率
// 异常值剔除
double percentile = StatUtils . percentile ( series . stream ( ) . mapToDouble ( Double : : valueOf ) . toArray ( ) ,
ApplicationConfig . BASELINE_SPARSE_FILL_PERCENTILE ) ;
Arrays . fill ( baselineArr , ( int ) percentile ) ;
// KF
baselineArr = baselineFunction ( series ) ;
} else {
// 判断周期性
if ( SeriesUtils . isPeriod ( series ) ) {
// KF
baselineArr = baselineFunction ( series ) ;
} else {
// 百分位数
int ipPercentile = SeriesUtils . percentile (
ipDruidData . stream ( ) . map ( i - >
Integer . valueOf ( i . get ( ApplicationConfig . BASELINE_METRIC_TYPE ) . toString ( ) ) ) . collect ( Collectors . toList ( ) ) ,
@@ -120,11 +123,11 @@ public class BaselineSingleThread extends Thread {
switch ( ApplicationConfig . BASELINE_FUNCTION ) {
case " KalmanFilter " :
KalmanFilter kalmanFilter = new KalmanFilter ( ) ;
kalmanFilter . forcast ( timeSeries , historical PointNum) ;
kalmanFilter . forcast ( timeSeries , baseline PointNum) ;
result = kalmanFilter . getForecastSeries ( ) . stream ( ) . mapToInt ( Integer : : valueOf ) . toArray ( ) ;
break ;
default :
result = timeSeries . subList ( 0 , historical PointNum) . stream ( ) . mapToInt ( Integer : : valueOf ) . toArray ( ) ;
result = timeSeries . subList ( 0 , baseline PointNum) . stream ( ) . mapToInt ( Integer : : valueOf ) . toArray ( ) ;
}
return result ;
}