diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index 3a78215..f492223 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -32,6 +32,7 @@ public class ApplicationConfig { public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood"); public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification"); public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.serverip"); + public static final String DRUID_VSYSID_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.vsysid"); public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.attacktype"); public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.recvtime"); public static final String DRUID_COLUMNNAME_PARTITION_NUM = ConfigUtils.getStringProperty("druid.columnname.partition.num"); diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java index d0f486f..dc82c57 100644 --- a/src/main/java/cn/mesalab/dao/DruidData.java +++ b/src/main/java/cn/mesalab/dao/DruidData.java @@ -48,21 +48,23 @@ public class DruidData { } public static Map>> selectAll(List> result) { - Map>> allIpDataList = new HashMap<>(); - ArrayList ipList = new ArrayList<>(); + Map>> allKeyDataList = new HashMap<>(); + ArrayList keyList = new ArrayList<>(); for (Map rowData : result) { String ip = (String) rowData.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); - if (!ipList.contains(ip)) { - ipList.add(ip); - List> ipData = new ArrayList<>(); - allIpDataList.put(ip, ipData); + String vsysId = Long.toString((Long) rowData.get(ApplicationConfig.DRUID_VSYSID_COLUMN_NAME)); + String key = ip + "-" + vsysId; + if (!keyList.contains(key)) { + keyList.add(key); + List> keyData = new ArrayList<>(); + allKeyDataList.put(key, keyData); } rowData.remove(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); - allIpDataList.get(ip).add(rowData); + allKeyDataList.get(key).add(rowData); } - return allIpDataList; + return allKeyDataList; } @@ -158,9 +160,10 @@ public class DruidData { + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + " < MILLIS_TO_TIMESTAMP(" + endTime + ")"; - return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME - + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME - + ", AVG("+ ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE + String sql = "SELECT " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + + ", " + ApplicationConfig.DRUID_VSYSID_COLUMN_NAME + + ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + + ", AVG(" + ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + " FROM " + ApplicationConfig.DRUID_TABLE + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + " IN " + attackList @@ -168,8 +171,11 @@ public class DruidData { + " AND " + timeFilter + " AND " + partitionFilter + " GROUP BY (" + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + + ", " + ApplicationConfig.DRUID_VSYSID_COLUMN_NAME + ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ")"; + System.out.println(sql); + return sql; } /** diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index b3fa542..aecf947 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -7,7 +7,6 @@ import cn.mesalab.utils.DruidUtils; import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.RetryUtils; import cn.mesalab.utils.SeriesUtils; -import com.google.common.collect.Lists; import io.vavr.Tuple3; import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaStatement; @@ -83,35 +82,38 @@ public class BaselineSingleThread extends Thread { batchDruidData = new HashMap<>(); } - LOG.info("完成数据处理:获取Server IP:" + batchDruidData.size() + + LOG.info("完成数据处理:获取Server IP + vsys_id:" + batchDruidData.size() + " 运行时间:" + (System.currentTimeMillis() - start)); // 基线生成 List putList = new ArrayList<>(); for(String attackType: attackTypeList){ - for(String ip: batchDruidData.keySet()){ - // 筛选指定ip指定攻击类型的数据 - List> ipDruidData = batchDruidData.get(ip).stream() + for(String key: batchDruidData.keySet()){ + // 筛选指定key(ip+vsys_id)指定攻击类型的数据 + List> keyDruidData = batchDruidData.get(key).stream() .filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList()); // baseline生成 - Tuple3 tuple = generateSingleIpBaseline(ip, ipDruidData); + Tuple3 tuple = generateSingleBaseline(key, keyDruidData); if(tuple!=null){ - int[] ipBaseline = tuple._1; + int[] baseline = tuple._1; int generateType = tuple._2; int zeroReplaceValue = tuple._3; - if ((BASELINE_SAVE_LEVEL >= generateType) && (ipBaseline!= null ) && (ip.length()>0)){ - hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); - hbaseUtils.cachedInPut(putList, ip, generateType, attackType, + + List keys = Arrays.asList(key.split("-")); + keys.remove(""); + if ((BASELINE_SAVE_LEVEL >= generateType) && (baseline!= null ) && (keys.size() == 2)){ + hbaseUtils.cachedInPut(putList, key, baseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); + hbaseUtils.cachedInPut(putList, key, generateType, attackType, ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_GENERATION_TYPE_SUFFIX); - hbaseUtils.cachedInPut(putList, ip, zeroReplaceValue, attackType, + hbaseUtils.cachedInPut(putList, key, zeroReplaceValue, attackType, ApplicationConfig.BASELINE_METRIC_TYPE + "_" + ApplicationConfig.HBASE_BASELINE_ZERO_REPLACE_VALUE_SUFFIX); } } } } try { - LOG.info("MONITOR-IP频率分段统计:" + frequencyBinCounter); + LOG.info("MONITOR-IP-vsysID频率分段统计:" + frequencyBinCounter); LOG.info("MONITOR-生成类别统计:" + generateTypeCounter); LOG.info("MONITOR-无baseline生成的个数:" + discardBaselineCounter + " 其中包括IP共:" + discardIpList.size()); hbaseTable.put(putList); @@ -151,36 +153,36 @@ public class BaselineSingleThread extends Thread { /** * 单ip baseline生成逻辑 - * @param ip - * @param ipDruidData + * @param key + * @param keyDruidData * @return baseline序列,长度为 60/HISTORICAL_GRAD*24; * baselineGenerationType: - * 1: 高频IP - * 2: 低频有周期IP - * 3:其他类型IP, 采用百分位阈值基线 + * 1: 高频key + * 2: 低频有周期key + * 3:其他类型key, 采用百分位阈值基线 */ - private Tuple3 generateSingleIpBaseline(String ip, List> ipDruidData){ - // 无数据(ip-攻击类型)不计算 - if (ipDruidData.size()==0){ - updateDiscardCounter(ip); + private Tuple3 generateSingleBaseline(String key, List> keyDruidData){ + // 无数据不计算 + if (keyDruidData.size()==0){ + updateDiscardCounter(key); return null; } - List originSeries = ipDruidData.stream().map(i -> + List originSeries = keyDruidData.stream().map(i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); List originNonZeroSeries = originSeries.stream().filter(i->i>0).collect(Collectors.toList()); // 全零(ip-攻击类型)不计算 if(originNonZeroSeries.size()==0){ - updateDiscardCounter(ip); + updateDiscardCounter(key); return null; } - int ipPercentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); + int percentile = SeriesUtils.percentile(originSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); int baselineGenerationType; int[] baselineArr = new int[baselinePointNum]; // 时间序列缺失值补0 - List> completSeries = SeriesUtils.complementSeries(ipDruidData); + List> completSeries = SeriesUtils.complementSeries(keyDruidData); Listseries = completSeries.stream().map( i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); @@ -190,7 +192,7 @@ public class BaselineSingleThread extends Thread { // 异常值剔除 double exceptionPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXECEPTION_PERCENTILE); double exceptionFillPercentile = SeriesUtils.percentile(series, ApplicationConfig.BASELINE_EXCECPTION_FILL_PERCENTILE); - LOG.debug(ip + ": series-" + series); + LOG.debug(key + ": series-" + series); for(int i=0; i exceptionPercentile){ series.set(i, (int) exceptionFillPercentile); @@ -203,7 +205,7 @@ public class BaselineSingleThread extends Thread { double p50 = SeriesUtils.percentile(series, 0.50); // 无周期性 - float ipFrequency = ipDruidData.size() / (float) completSeries.size(); + float ipFrequency = keyDruidData.size() / (float) completSeries.size(); updateLogFrequencyCounter(ipFrequency); // 频率判断 if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD ){ @@ -220,7 +222,7 @@ public class BaselineSingleThread extends Thread { // 计算默认值-非零数据的百分位数 int defaultValue = SeriesUtils.percentile(originNonZeroSeries, ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); if(defaultValue == 0){ - LOG.error(ip + "-" + "baseline default value is 0 !"); + LOG.error(key + "-" + "baseline default value is 0 !"); } return new Tuple3<>(baselineArr, baselineGenerationType, defaultValue); diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java index 03bfd5e..6d21c1d 100644 --- a/src/main/java/cn/mesalab/utils/HbaseUtils.java +++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java @@ -1 +1 @@ -package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; private Table hbaseTable; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ if(hbaseTable == null){ try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } } return hbaseTable; } public List cachedInPut(List putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } public List cachedInPut(List putList, String ip, int value, String columnFamily, String columnName){ Put rowPut = new Put(Bytes.toBytes(ip)); rowPut.addColumn( Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value)); putList.add(rowPut); return putList; } private Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public ArrayList fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList list = new ArrayList(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } public void close(){ try { hbaseTable.close(); } catch (IOException e) { e.printStackTrace(); } } } \ No newline at end of file +package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; private Table hbaseTable; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ if(hbaseTable == null){ try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } } return hbaseTable; } public List cachedInPut(List putList, String key, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(key)); rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } public List cachedInPut(List putList, String key, int value, String columnFamily, String columnName){ Put rowPut = new Put(Bytes.toBytes(key)); rowPut.addColumn( Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value)); putList.add(rowPut); return putList; } private Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public ArrayList fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList list = new ArrayList(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } public void close(){ try { hbaseTable.close(); } catch (IOException e) { e.printStackTrace(); } } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 2104cfa..e730145 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -19,9 +19,9 @@ hbase.zookeeper.client.port=2181 #读取druid时间范围方式, # 0:读取默认范围天数read.historical.days; # 1:指定时间范围 -read.druid.time.limit.type=0 -read.druid.min.time=1627747200000 -read.druid.max.time=1630425600000 +read.druid.time.limit.type=1 +read.druid.min.time=1663430400000 +read.druid.max.time=1663603200000 #Druid字段映射 druid.attacktype.tcpsynflood=TCP SYN Flood @@ -29,6 +29,7 @@ druid.attacktype.udpflood=UDP Flood druid.attacktype.icmpflood=ICMP Flood druid.attacktype.dnsamplification=DNS Flood druid.columnname.serverip=destination_ip +druid.columnname.vsysid=vsys_id druid.columnname.attacktype=attack_type druid.columnname.recvtime=__time druid.columnname.partition.num=partition_num @@ -76,7 +77,7 @@ monitor.frequency.bin.num=100 ########################################## ################ 并发参数 ################# ########################################## -all.partition.num=100 +all.partition.num=10 core.pool.size=10 max.pool.size=10 #druid分区字段partition_num的最大值为9999