diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java index ec28278..8540bf0 100644 --- a/src/main/java/cn/mesalab/dao/DruidData.java +++ b/src/main/java/cn/mesalab/dao/DruidData.java @@ -32,10 +32,6 @@ public class DruidData { private static DruidData druidData; private AvaticaConnection connection; private AvaticaStatement statement; - private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2 - + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")"; { @@ -69,13 +65,13 @@ public class DruidData { * 获取distinct server ip * @return ArrayList ip列表 */ - public ArrayList getServerIpList() { + public static ArrayList getServerIpList(AvaticaStatement statement, String timeFilter) { Long startQueryIpLIstTime = System.currentTimeMillis(); ArrayList serverIps = new ArrayList(); String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + " FROM " + ApplicationConfig.DRUID_TABLE + " WHERE " + timeFilter - + " LIMIT 1000";// FOR TEST + + " LIMIT 200";// FOR TEST try{ ResultSet resultSet = DruidUtils.executeQuery(statement,sql); while(resultSet.next()){ @@ -96,7 +92,7 @@ public class DruidData { * @param ipList ip列表 * @return 数据库读取结果 */ - public List> readFromDruid(List ipList){ + public static List> readFromDruid(AvaticaConnection connection, AvaticaStatement statement, List ipList, String timeFilter){ List> rsList = null; ipList = ipList.stream().map( ip -> "\'"+ip+"\'").collect(Collectors.toList()); String ipString = "(" + StringUtils.join(ipList, ",").toString() + ")"; @@ -125,7 +121,7 @@ public class DruidData { * @param attackType 指定攻击类型 * @return 筛选结果 */ - public List> getTimeSeriesData(List> allData, String ip, String attackType){ + public static List> getTimeSeriesData(List> allData, String ip, String attackType){ List> rsList = new ArrayList<>(); try{ rsList = allData.stream(). @@ -141,7 +137,7 @@ public class DruidData { * 计算查询时间范围,可指定时间范围(测试)或使用默认配置 * @return 时间范围起始点和终止点 */ - public Tuple2 getTimeLimit(){ + public static Tuple2 getTimeLimit(){ long maxTime = 0L; long minTime = 0L; switch(ApplicationConfig.DRUID_TIME_LIMIT_TYPE){ @@ -159,7 +155,7 @@ public class DruidData { return Tuple.of(maxTime, minTime); } - private long getCurrentDay(int bias) { + private static long getCurrentDay(int bias) { Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + bias); calendar.set(Calendar.HOUR_OF_DAY, 0); @@ -170,7 +166,7 @@ public class DruidData { return calendar.getTimeInMillis(); } - private long getCurrentDay(){ + private static long getCurrentDay(){ return getCurrentDay(0); } diff --git a/src/main/java/cn/mesalab/main/BaselineApplication.java b/src/main/java/cn/mesalab/main/BaselineApplication.java index 8bd6f13..d9c1694 100644 --- a/src/main/java/cn/mesalab/main/BaselineApplication.java +++ b/src/main/java/cn/mesalab/main/BaselineApplication.java @@ -1,7 +1,6 @@ package cn.mesalab.main; import cn.mesalab.service.BaselineGeneration; -import sun.rmi.runtime.Log; /** * @author yjy @@ -10,6 +9,6 @@ import sun.rmi.runtime.Log; */ public class BaselineApplication { public static void main(String[] args) { - BaselineGeneration.perform(); + new BaselineGeneration().perform(); } } diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java index e72a0e6..d46b45e 100644 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -2,21 +2,19 @@ package cn.mesalab.service; import cn.mesalab.config.ApplicationConfig; import cn.mesalab.dao.DruidData; -import cn.mesalab.service.algorithm.KalmanFilter; +import cn.mesalab.utils.DruidUtils; import cn.mesalab.utils.HbaseUtils; -import cn.mesalab.utils.SeriesUtils; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.commons.math3.stat.StatUtils; -import org.apache.hadoop.hbase.client.Put; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaStatement; import org.apache.hadoop.hbase.client.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; +import java.sql.SQLException; import java.util.*; import java.util.concurrent.*; -import java.util.stream.Collectors; /** * @author yjy @@ -27,10 +25,18 @@ import java.util.stream.Collectors; public class BaselineGeneration { private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class); - private static DruidData druidData; - private static HbaseUtils hbaseUtils; - private static Table hbaseTable; - private static List> batchDruidData = new ArrayList<>(); + private static AvaticaConnection druidConn = DruidUtils.getConn(); + private static AvaticaStatement druidStatement; + + static { + try { + druidStatement = DruidUtils.getStatement(druidConn); + } catch (SQLException exception) { + exception.printStackTrace(); + } + } + + private static Table hbaseTable = HbaseUtils.getInstance().getHbaseTable(); private static List attackTypeList = Arrays.asList( ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD, @@ -41,17 +47,17 @@ public class BaselineGeneration { private static final Integer BASELINE_POINT_NUM = ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); + private static String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " >= MILLIS_TO_TIMESTAMP(" + DruidData.getTimeLimit()._2 + + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " < MILLIS_TO_TIMESTAMP(" + DruidData.getTimeLimit()._1 + ")"; + /** * 程序执行 */ - public static void perform() { + public void perform() { long start = System.currentTimeMillis(); - druidData = DruidData.getInstance(); - hbaseUtils = HbaseUtils.getInstance(); - hbaseTable = hbaseUtils.getHbaseTable(); - LOG.info("Druid 成功建立连接"); - try{ // baseline生成并写入 generateBaselinesThread(); @@ -59,7 +65,7 @@ public class BaselineGeneration { long last = System.currentTimeMillis(); LOG.warn("运行时间:" + (last - start)); - druidData.closeConn(); + druidConn.close(); hbaseTable.close(); LOG.info("Druid 关闭连接"); @@ -73,7 +79,7 @@ public class BaselineGeneration { * 多线程baseline生成入口 * @throws InterruptedException */ - private static void generateBaselinesThread() throws InterruptedException { + private void generateBaselinesThread() throws InterruptedException { int threadNum = Runtime.getRuntime().availableProcessors(); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() @@ -90,16 +96,26 @@ public class BaselineGeneration { new ThreadPoolExecutor.AbortPolicy()); // IP列表获取 - ArrayList destinationIps = druidData.getServerIpList(); + ArrayList destinationIps = DruidData.getServerIpList(druidStatement, timeFilter); LOG.info("共查询到服务端ip " +destinationIps.size() + " 个"); LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE); // 分批进行IP baseline生成和处理 List> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE); + for (List batchIps: batchIpLists){ if(batchIps.size()>0){ - executor.execute(() -> generateBaselines(batchIps)); + BaselineSingleThread testForInsider = new BaselineSingleThread( + batchIps, + druidConn, + druidStatement, + hbaseTable, + attackTypeList, + BASELINE_POINT_NUM, + timeFilter + ); + executor.execute(testForInsider); } } @@ -107,100 +123,4 @@ public class BaselineGeneration { executor.awaitTermination(10L, TimeUnit.HOURS); } - /** - * 批量生成IP baseline - * @param ipList ip列表 - */ - public static void generateBaselines(List ipList){ - druidData = DruidData.getInstance(); - batchDruidData = druidData.readFromDruid(ipList); - - List putList = new ArrayList<>(); - for(String attackType: attackTypeList){ - for(String ip: ipList){ - int[] ipBaseline = generateSingleIpBaseline(ip, attackType); - if (ipBaseline!= null){ - putList = hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); - } - } - } - - try { - hbaseTable.put(putList); - LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size()); - } catch (IOException e) { - e.printStackTrace(); - } - - druidData.closeConn(); - } - - /** - * 单ip baseline生成逻辑 - * @param ip ip - * @param attackType 攻击类型 - * @return baseline序列,长度为 60/HISTORICAL_GRAD*24 - */ - private static int[] generateSingleIpBaseline(String ip, String attackType){ - // 查询 - List> originSeries = druidData.getTimeSeriesData(batchDruidData, ip, attackType); - - if (originSeries.size()==0){ - return null; - } - - // 时间序列缺失值补0 - List> completSeries = SeriesUtils.complementSeries(originSeries); - - int[] baselineArr = new int[BASELINE_POINT_NUM]; - Listseries = completSeries.stream().map( - i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); - - // 判断ip出现频率 - if(originSeries.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){ - // 高频率 - double percentile = StatUtils.percentile(series.stream().mapToDouble(Double::valueOf).toArray(), - ApplicationConfig.BASELINE_SPARSE_FILL_PERCENTILE); - Arrays.fill(baselineArr, (int)percentile); - baselineArr = baselineFunction(series); - - } else { - // 判断周期性 - if (SeriesUtils.isPeriod(series)){ - baselineArr = baselineFunction(series); - } else { - int ipPercentile = SeriesUtils.percentile( - originSeries.stream().map(i -> - Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()), - ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); - Arrays.fill(baselineArr, ipPercentile); - } - } - - return baselineArr; - } - - /** - * baseline 生成算法 - * @param timeSeries 输入序列 - * @return 输出序列 - */ - private static int[] baselineFunction(List timeSeries){ - int[] result; - switch (ApplicationConfig.BASELINE_FUNCTION){ - case "KalmanFilter": - KalmanFilter kalmanFilter = new KalmanFilter(); - kalmanFilter.forcast(timeSeries, BASELINE_POINT_NUM); - result = kalmanFilter.getForecastSeries().stream().mapToInt(Integer::valueOf).toArray(); - break; - default: - result = timeSeries.subList(0, BASELINE_POINT_NUM).stream().mapToInt(Integer::valueOf).toArray(); - } - return result; - } - - public static void main(String[] args) { - perform(); - } - } diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java new file mode 100644 index 0000000..988355a --- /dev/null +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -0,0 +1,142 @@ +package cn.mesalab.service; + +import cn.mesalab.config.ApplicationConfig; +import cn.mesalab.dao.DruidData; +import cn.mesalab.service.algorithm.KalmanFilter; +import cn.mesalab.utils.HbaseUtils; +import cn.mesalab.utils.SeriesUtils; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaStatement; +import org.apache.commons.math3.stat.StatUtils; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * @author yjy + * @version 1.0 + * @date 2021/8/3 6:18 下午 + */ +public class BaselineSingleThread extends Thread { + private static final Logger LOG = LoggerFactory.getLogger(BaselineSingleThread.class); + + private List ipList; + private AvaticaConnection druidConn; + private AvaticaStatement druidStatement; + private Table hbaseTable; + private List attackTypeList; + private Integer BASELINE_POINT_NUM; + private String timeFilter; + private List> batchDruidData; + + public BaselineSingleThread( + List batchIpList, + AvaticaConnection druidConn, + AvaticaStatement druidStatement, + Table hbaseTable, + List attackTypeList, + Integer BASELINE_POINT_NUM, + String timeFilter + ){ + this.ipList = batchIpList; + this.druidConn = druidConn; + this.druidStatement = druidStatement; + this.hbaseTable = hbaseTable; + this.attackTypeList = attackTypeList; + this.BASELINE_POINT_NUM = BASELINE_POINT_NUM; + this.timeFilter = timeFilter; + } + + @Override + public void run(){ + batchDruidData = DruidData.readFromDruid(druidConn, druidStatement, ipList, timeFilter); + + List putList = new ArrayList<>(); + for(String attackType: attackTypeList){ + for(String ip: ipList){ + int[] ipBaseline = generateSingleIpBaseline(ip, attackType); + if (ipBaseline!= null){ + putList = HbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); + } + } + } + try { + hbaseTable.put(putList); + LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 单ip baseline生成逻辑 + * @param ip ip + * @param attackType 攻击类型 + * @return baseline序列,长度为 60/HISTORICAL_GRAD*24 + */ + private int[] generateSingleIpBaseline(String ip, String attackType){ + // 查询 + List> originSeries = DruidData.getTimeSeriesData(batchDruidData, ip, attackType); + + if (originSeries.size()==0){ + return null; + } + + // 时间序列缺失值补0 + List> completSeries = SeriesUtils.complementSeries(originSeries); + + int[] baselineArr = new int[BASELINE_POINT_NUM]; + Listseries = completSeries.stream().map( + i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); + + // 判断ip出现频率 + if(originSeries.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){ + // 高频率 + double percentile = StatUtils.percentile(series.stream().mapToDouble(Double::valueOf).toArray(), + ApplicationConfig.BASELINE_SPARSE_FILL_PERCENTILE); + Arrays.fill(baselineArr, (int)percentile); + baselineArr = baselineFunction(series); + + } else { + // 判断周期性 + if (SeriesUtils.isPeriod(series)){ + baselineArr = baselineFunction(series); + } else { + int ipPercentile = SeriesUtils.percentile( + originSeries.stream().map(i -> + Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()), + ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); + Arrays.fill(baselineArr, ipPercentile); + } + } + + return baselineArr; + } + + /** + * baseline 生成算法 + * @param timeSeries 输入序列 + * @return 输出序列 + */ + private int[] baselineFunction(List timeSeries){ + int[] result; + switch (ApplicationConfig.BASELINE_FUNCTION){ + case "KalmanFilter": + KalmanFilter kalmanFilter = new KalmanFilter(); + kalmanFilter.forcast(timeSeries, BASELINE_POINT_NUM); + result = kalmanFilter.getForecastSeries().stream().mapToInt(Integer::valueOf).toArray(); + break; + default: + result = timeSeries.subList(0, BASELINE_POINT_NUM).stream().mapToInt(Integer::valueOf).toArray(); + } + return result; + } +} diff --git a/src/main/java/cn/mesalab/utils/DruidUtils.java b/src/main/java/cn/mesalab/utils/DruidUtils.java index 8224d37..a9967b1 100644 --- a/src/main/java/cn/mesalab/utils/DruidUtils.java +++ b/src/main/java/cn/mesalab/utils/DruidUtils.java @@ -19,16 +19,19 @@ public class DruidUtils { private static ThreadLocal threadLocal = new ThreadLocal(); private static final String DRUID_URL = ApplicationConfig.DRUID_URL; - private static AvaticaStatement statement = null; /** * 打开连接 * @throws SQLException */ - public static AvaticaConnection getConn() throws SQLException { + public static AvaticaConnection getConn() { Properties properties = new Properties(); - properties.setProperty("connectTimeout", String.valueOf(10*60*60)); - AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties); + AvaticaConnection connection = null; + try { + connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties); + } catch (SQLException exception) { + exception.printStackTrace(); + } threadLocal.set(connection); return connection; } @@ -48,8 +51,12 @@ public class DruidUtils { * 根据sql查询结果 */ public static ResultSet executeQuery (AvaticaStatement statement, String sql) throws SQLException{ - ResultSet resultSet = statement.executeQuery(sql); + ResultSet resultSet = statement.executeQuery(sql); return resultSet; } + public static AvaticaStatement getStatement(AvaticaConnection conn) throws SQLException { + return conn.createStatement(); + } + } diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java index 86123bb..a79bfda 100644 --- a/src/main/java/cn/mesalab/utils/HbaseUtils.java +++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java @@ -1 +1 @@ -package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ Table hbaseTable = null; try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } return hbaseTable; } public List cachedInPut(List putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); // FOR TEST // start if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){ attackType = "TCP SYN Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){ attackType = "UDP Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){ attackType = "ICMP Flood"; } else { attackType = "DNS Amplification"; } // end rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } private static Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public static ArrayList fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList list = new ArrayList(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } } \ No newline at end of file +package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ Table hbaseTable = null; try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } return hbaseTable; } public static List cachedInPut(List putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); // FOR TEST // start if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){ attackType = "TCP SYN Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){ attackType = "UDP Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){ attackType = "ICMP Flood"; } else { attackType = "DNS Amplification"; } // end rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } private static Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public static ArrayList fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList list = new ArrayList(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } } \ No newline at end of file diff --git a/src/main/java/cn/mesalab/utils/SeriesUtils.java b/src/main/java/cn/mesalab/utils/SeriesUtils.java index 17f84b3..b01678e 100644 --- a/src/main/java/cn/mesalab/utils/SeriesUtils.java +++ b/src/main/java/cn/mesalab/utils/SeriesUtils.java @@ -2,7 +2,6 @@ package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import cn.mesalab.dao.DruidData; -import cn.mesalab.service.BaselineGeneration; import com.google.common.collect.Lists; import org.jfree.util.Log; import org.slf4j.Logger; @@ -10,13 +9,11 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.FileReader; -import java.lang.reflect.Array; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; -import java.util.stream.Stream; /** diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 8a705ec..42c9c32 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -54,7 +54,7 @@ baseline.kalman.r=0.002 # 每更新1000个记录打印log log.write.count=10000 # FOR TEST -generate.batch.size=10 +generate.batch.size=100 # http client配置 diff --git a/src/test/java/cn/mesalab/service/BaselineGenerationBak.java b/src/test/java/cn/mesalab/service/BaselineGenerationBak.java new file mode 100644 index 0000000..953bd0d --- /dev/null +++ b/src/test/java/cn/mesalab/service/BaselineGenerationBak.java @@ -0,0 +1,206 @@ +//package cn.mesalab.service; +// +//import cn.mesalab.config.ApplicationConfig; +//import cn.mesalab.dao.DruidData; +//import cn.mesalab.service.algorithm.KalmanFilter; +//import cn.mesalab.utils.HbaseUtils; +//import cn.mesalab.utils.SeriesUtils; +//import com.google.common.collect.Lists; +//import com.google.common.util.concurrent.ThreadFactoryBuilder; +//import org.apache.commons.math3.stat.StatUtils; +//import org.apache.hadoop.hbase.client.Put; +//import org.apache.hadoop.hbase.client.Table; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.io.IOException; +//import java.util.*; +//import java.util.concurrent.*; +//import java.util.stream.Collectors; +// +///** +// * @author yjy +// * @version 1.0 +// * baseline生成及写入 +// * @date 2021/7/23 5:38 下午 +// */ +//public class BaselineGeneration { +// private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class); +// +// private static DruidData druidData; +// private static HbaseUtils hbaseUtils; +// private static Table hbaseTable; +// private static List> batchDruidData = new ArrayList<>(); +// +// private static List attackTypeList = Arrays.asList( +// ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD, +// ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD, +// ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD, +// ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL +// ); +// private static final Integer BASELINE_POINT_NUM = +// ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); +// +// /** +// * 程序执行 +// */ +// public static void perform() { +// long start = System.currentTimeMillis(); +// +// druidData = DruidData.getInstance(); +// hbaseUtils = HbaseUtils.getInstance(); +// hbaseTable = hbaseUtils.getHbaseTable(); +// LOG.info("Druid 成功建立连接"); +// +// try{ +// // baseline生成并写入 +// generateBaselinesThread(); +// +// long last = System.currentTimeMillis(); +// LOG.warn("运行时间:" + (last - start)); +// +// druidData.closeConn(); +// hbaseTable.close(); +// LOG.info("Druid 关闭连接"); +// +// } catch (Exception e){ +// e.printStackTrace(); +// } +// System.exit(0); +// } +// +// /** +// * 多线程baseline生成入口 +// * @throws InterruptedException +// */ +// private static void generateBaselinesThread() throws InterruptedException { +// int threadNum = Runtime.getRuntime().availableProcessors(); +// +// ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() +// .setNameFormat("baseline-demo-%d").build(); +// +// // 创建线程池 +// ThreadPoolExecutor executor = new ThreadPoolExecutor( +// threadNum, +// threadNum, +// 0L, +// TimeUnit.MILLISECONDS, +// new LinkedBlockingQueue<>(1024), +// namedThreadFactory, +// new ThreadPoolExecutor.AbortPolicy()); +// +// // IP列表获取 +// ArrayList destinationIps = druidData.getServerIpList(); +// +// LOG.info("共查询到服务端ip " +destinationIps.size() + " 个"); +// LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE); +// +// // 分批进行IP baseline生成和处理 +// List> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE); +// for (List batchIps: batchIpLists){ +// if(batchIps.size()>0){ +// executor.execute(() -> generateBaselines(batchIps)); +// } +// } +// +// executor.shutdown(); +// executor.awaitTermination(10L, TimeUnit.HOURS); +// } +// +// /** +// * 批量生成IP baseline +// * @param ipList ip列表 +// */ +// public static void generateBaselines(List ipList){ +// druidData = DruidData.getInstance(); +// batchDruidData = druidData.readFromDruid(ipList); +// +// List putList = new ArrayList<>(); +// for(String attackType: attackTypeList){ +// for(String ip: ipList){ +// int[] ipBaseline = generateSingleIpBaseline(ip, attackType); +// if (ipBaseline!= null){ +// putList = hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); +// } +// } +// } +// +// try { +// hbaseTable.put(putList); +// LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size()); +// } catch (IOException e) { +// e.printStackTrace(); +// } +// +// druidData.closeConn(); +// } +// +// /** +// * 单ip baseline生成逻辑 +// * @param ip ip +// * @param attackType 攻击类型 +// * @return baseline序列,长度为 60/HISTORICAL_GRAD*24 +// */ +// private static int[] generateSingleIpBaseline(String ip, String attackType){ +// // 查询 +// List> originSeries = druidData.getTimeSeriesData(batchDruidData, ip, attackType); +// +// if (originSeries.size()==0){ +// return null; +// } +// +// // 时间序列缺失值补0 +// List> completSeries = SeriesUtils.complementSeries(originSeries); +// +// int[] baselineArr = new int[BASELINE_POINT_NUM]; +// Listseries = completSeries.stream().map( +// i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); +// +// // 判断ip出现频率 +// if(originSeries.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){ +// // 高频率 +// double percentile = StatUtils.percentile(series.stream().mapToDouble(Double::valueOf).toArray(), +// ApplicationConfig.BASELINE_SPARSE_FILL_PERCENTILE); +// Arrays.fill(baselineArr, (int)percentile); +// baselineArr = baselineFunction(series); +// +// } else { +// // 判断周期性 +// if (SeriesUtils.isPeriod(series)){ +// baselineArr = baselineFunction(series); +// } else { +// int ipPercentile = SeriesUtils.percentile( +// originSeries.stream().map(i -> +// Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()), +// ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); +// Arrays.fill(baselineArr, ipPercentile); +// } +// } +// +// return baselineArr; +// } +// +// /** +// * baseline 生成算法 +// * @param timeSeries 输入序列 +// * @return 输出序列 +// */ +// private static int[] baselineFunction(List timeSeries){ +// int[] result; +// switch (ApplicationConfig.BASELINE_FUNCTION){ +// case "KalmanFilter": +// KalmanFilter kalmanFilter = new KalmanFilter(); +// kalmanFilter.forcast(timeSeries, BASELINE_POINT_NUM); +// result = kalmanFilter.getForecastSeries().stream().mapToInt(Integer::valueOf).toArray(); +// break; +// default: +// result = timeSeries.subList(0, BASELINE_POINT_NUM).stream().mapToInt(Integer::valueOf).toArray(); +// } +// return result; +// } +// +// public static void main(String[] args) { +// perform(); +// } +// +//} diff --git a/src/test/java/cn/mesalab/service/HBaseTest.java b/src/test/java/cn/mesalab/service/HBaseTest.java index 9598865..b67472f 100644 --- a/src/test/java/cn/mesalab/service/HBaseTest.java +++ b/src/test/java/cn/mesalab/service/HBaseTest.java @@ -37,18 +37,36 @@ public class HBaseTest { Table table = conn.getTable(tableName); - DruidData druidData = DruidData.getInstance(); - ArrayList destinationIps = druidData.getServerIpList(); +// DruidData druidData = DruidData.getInstance(); +// ArrayList destinationIps = druidData.getServerIpList(); + List ips = Arrays.asList( + "192.168.1.1", + "192.168.1.2", + "192.168.1.3", + "192.168.1.4", + "192.168.1.5", + "192.168.1.6", + "192.168.1.7", + "192.168.1.8", + "192.168.10.1", + "192.168.10.2", + "192.168.10.3", + "192.168.10.4", + "192.168.10.5", + "192.168.10.6", + "192.168.10.7", + "192.168.10.8" + ); - for (String ip : destinationIps){ + for (String ip : ips){ Get abcGet = new Get(Bytes.toBytes(ip)); Result r = table.get(abcGet); ArrayWritable w = new ArrayWritable(IntWritable.class); List attackTypeList = Arrays.asList( "TCP SYN Flood", - "ICMP Flood", - "UDP Flood", - "DNS Amplification" + "ICMP Flood" +// "UDP Flood", +// "DNS Amplification" ); for (String attackType : attackTypeList){ byte[] session_nums = r.getValue(Bytes.toBytes(attackType), Bytes.toBytes("session_num")); @@ -62,12 +80,35 @@ public class HBaseTest { } -// Get abcGet = new Get(Bytes.toBytes("1.0.0.1")); -// Result r = table.get(abcGet); -// ArrayWritable w = new ArrayWritable(IntWritable.class); -// w.readFields(new DataInputStream(new ByteArrayInputStream(r.getValue(Bytes.toBytes("TCP SYN Flood"), Bytes.toBytes("session_num"))))); -// ArrayList arr2 = fromWritable(w); -// System.out.println(arr2.toString()); + + +// int[] arr = new int[144]; +// Arrays.fill(arr, 100); +// List ips = Arrays.asList( +// "192.168.1.1", +// "192.168.1.2", +// "192.168.1.3", +// "192.168.1.4", +// "192.168.1.5", +// "192.168.1.6", +// "192.168.1.7", +// "192.168.1.8", +// "192.168.10.1", +// "192.168.10.2", +// "192.168.10.3", +// "192.168.10.4", +// "192.168.10.5", +// "192.168.10.6", +// "192.168.10.7", +// "192.168.10.8" +// ); +// +// for (String ip : ips){ +// Put put = new Put(Bytes.toBytes(ip)); +// put.addColumn(Bytes.toBytes("ICMP Flood"),Bytes.toBytes("session_num"), WritableUtils.toByteArray(toWritable(arr))); +// table.put(put); +// } + } diff --git a/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java b/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java index a2d713e..a29cd3e 100644 --- a/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java +++ b/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java @@ -1,6 +1,13 @@ package cn.mesalab.utils; +import cn.mesalab.config.ApplicationConfig; +import cn.mesalab.dao.DruidData; +import com.google.common.collect.Maps; import com.zdjizhi.utils.JsonMapper; +import sun.net.util.URLUtil; + +import java.net.URL; +import java.util.Map; /** * @author yjy @@ -8,7 +15,57 @@ import com.zdjizhi.utils.JsonMapper; * @date 2021/8/3 4:43 下午 */ public class HttpClientUtilsTest { + private static HttpClientUtils httpClientUtils = new HttpClientUtils(); + + public static void main(String[] args) { + executeHttpPost("select * from top_server_ip_test_log limit 10"); + } + + private static Map executeHttpPost(String sql){ + String queryUrl = "http://192.168.44.12:8082/druid/v2/sql"; + DruidQueryParam druidQueryParam = getDruidQueryParam(sql); + int socketTimeout = ApplicationConfig.HTTP_RESPONSE_TIMEOUT; + Map stringStringMap = httpClientUtils.httpPost(queryUrl, JsonMapper.toJsonString(druidQueryParam), socketTimeout); + System.out.println(stringStringMap.toString()); + return stringStringMap; + } + + public static DruidQueryParam getDruidQueryParam(String sql) { + DruidQueryParam druidQueryParam = new DruidQueryParam(); + druidQueryParam.setQuery(sql); + druidQueryParam.getContext().put("skipEmptyBuckets", "true"); + druidQueryParam.setResultFormat("object"); + return druidQueryParam; + } +} +class DruidQueryParam { + private String query; + private Map context = Maps.newHashMap(); + private String resultFormat; + public String getQuery() { + return query; + } + + public void setQuery(String query) { + this.query = query; + } + + public Map getContext() { + return context; + } + + public void setContext(Map context) { + this.context = context; + } + + public String getResultFormat() { + return resultFormat; + } + + public void setResultFormat(String resultFormat) { + this.resultFormat = resultFormat; + } } \ No newline at end of file