diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index 1f88024..2423656 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -23,6 +23,8 @@ public class ApplicationConfig { public static final String TIME_FORMAT = ConfigUtils.getStringProperty("time.format"); public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type"); + public static final String HBASE_BASELINE_GENERATION_TYPE_SUFFIX = ConfigUtils.getStringProperty("hbase.baseline.generation.type.suffix"); + public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood"); public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood"); public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood"); diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index 99210a3..719f862 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -7,6 +7,8 @@ import cn.mesalab.utils.DruidUtils; import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.RetryUtils; import cn.mesalab.utils.SeriesUtils; +import io.vavr.Tuple; +import io.vavr.Tuple2; import org.apache.calcite.avatica.AvaticaClientRuntimeException; import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaStatement; @@ -86,9 +88,15 @@ public class BaselineSingleThread extends Thread { List> ipDruidData = batchDruidData.get(ip).stream() .filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList()); // baseline生成 - int[] ipBaseline = generateSingleIpBaseline(ip, ipDruidData); - if ((ipBaseline!= null ) && (ip.length()>0)){ - hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); + Tuple2 tuple = generateSingleIpBaseline(ip, ipDruidData); + if(tuple!=null){ + int[] ipBaseline = tuple._1; + int generateType = tuple._2; + if ((ipBaseline!= null ) && (ip.length()>0)){ + hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); + hbaseUtils.cachedInPut(putList, ip, generateType, attackType, + ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX); + } } } } @@ -119,24 +127,36 @@ public class BaselineSingleThread extends Thread { } /** - * 单ip baseline生成逻辑 + * * @return baseline序列,长度为 60/HISTORICAL_GRAD*24 */ - private int[] generateSingleIpBaseline(String ip, List> ipDruidData){ + /** + * 单ip baseline生成逻辑 + * @param ip + * @param ipDruidData + * @return baseline序列,长度为 60/HISTORICAL_GRAD*24; + * baselineGenerationType: + * 1: 高频IP + * 2: 低频有周期IP + * 3:其他类型IP, 采用百分位阈值基线 + */ + private Tuple2 generateSingleIpBaseline(String ip, List> ipDruidData){ if (ipDruidData.size()==0){ return null; } + int baselineGenerationType = 0; + int[] baselineArr = new int[baselinePointNum]; + // 时间序列缺失值补0 List> completSeries = SeriesUtils.complementSeries(ipDruidData); - - int[] baselineArr = new int[baselinePointNum]; Listseries = completSeries.stream().map( i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); // 判断ip出现频率 if(ipDruidData.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){ // 异常值剔除 + baselineGenerationType = 1; double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE); double exceptionFillPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXCECPTION_FILL_PERCENTILE); for(int i=0; i Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()), ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); Arrays.fill(baselineArr, ipPercentile); - // System.out.println("其他IP:" + ip + " origin:" + series + "\n baseline:" + Arrays.toString(baselineArr)); } } - return baselineArr; + return new Tuple2<>(baselineArr, baselineGenerationType); } /** diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java index a2657b7..28dfcf3 100644 --- a/src/main/java/cn/mesalab/utils/HbaseUtils.java +++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java @@ -1 +1 @@ -package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; private Table hbaseTable; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ if(hbaseTable == null){ try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } } return hbaseTable; } public List cachedInPut(List putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } private Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public ArrayList fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList list = new ArrayList(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } public void close(){ try { hbaseTable.close(); } catch (IOException e) { e.printStackTrace(); } } } \ No newline at end of file +package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; private Table hbaseTable; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ if(hbaseTable == null){ try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } } return hbaseTable; } public List cachedInPut(List putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } public List cachedInPut(List putList, String ip, int generateType, String attackType, String columnName){ Put rowPut = new Put(Bytes.toBytes(ip)); rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(columnName), Bytes.toBytes(generateType)); putList.add(rowPut); return putList; } private Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public ArrayList fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList list = new ArrayList(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } public void close(){ try { hbaseTable.close(); } catch (IOException e) { e.printStackTrace(); } } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 5b0242b..52026dc 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -23,7 +23,7 @@ read.druid.min.time=1625414400000 #07-08 read.druid.max.time=1625673600000 -#字段映射 +#Druid字段映射 druid.attacktype.tcpsynflood=TCP SYN Flood druid.attacktype.udpflood=UDP Flood druid.attacktype.icmpflood=ICMP Flood @@ -34,6 +34,9 @@ druid.columnname.recvtime=__time druid.columnname.partition.num=partition_num baseline.metric.type=session_rate +#Hbase字段映射 +hbase.baseline.generation.type.suffix=baseline_type + #数据情况 #读取历史N天数据,最小值为3天(需要判断周期性) read.historical.days=3