diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index dab7c8b..1e94280 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -51,8 +51,6 @@ public class ApplicationConfig { public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour"); public static final Integer THREAD_MAX_NUM = ConfigUtils.getIntProperty("thread.max.num"); - - // http config public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout"); public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout"); diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java index 32ffefc..c2f1cba 100644 --- a/src/main/java/cn/mesalab/dao/DruidData.java +++ b/src/main/java/cn/mesalab/dao/DruidData.java @@ -1,18 +1,16 @@ package cn.mesalab.dao; import cn.mesalab.config.ApplicationConfig; -import cn.mesalab.dao.Impl.ResultSetToListServiceImp; import cn.mesalab.utils.DruidUtils; import io.vavr.Tuple; import io.vavr.Tuple2; -import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaStatement; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.ResultSet; -import java.sql.SQLException; +import java.sql.ResultSetMetaData; import java.util.*; import java.util.stream.Collectors; @@ -26,139 +24,51 @@ import java.util.stream.Collectors; public class DruidData { private static final Logger LOG = LoggerFactory.getLogger(DruidData.class); - private static DruidData druidData; - private AvaticaConnection connection; - private AvaticaStatement statement; - - { - connectionInit(); + public static Map>> readFromDruid(String sql, AvaticaStatement statement){ + Map>> rsList = null; + try{ + ResultSet resultSet = DruidUtils.executeQuery(statement, sql); + rsList = selectAll(resultSet); + } catch (Exception e){ + e.printStackTrace(); + } + return rsList; } /** - * 连接初始化 + * 处理Druid读取返回数据为Map>>形式 + * 外层map key为ip,内层map的key为ip的一条日志 + * @param rs + * @return */ - private void connectionInit(){ + public static Map>> selectAll(ResultSet rs) { + Map>> allIpDataList = new HashMap<>(); + ArrayList ipList = new ArrayList<>(); + try { - connection = DruidUtils.getConn(); - statement = connection.createStatement(); - statement.setQueryTimeout(0); + ResultSetMetaData rmd = rs.getMetaData(); + int columnCount = rmd.getColumnCount(); - } catch (SQLException exception) { - exception.printStackTrace(); - } - } + while (rs.next()) { + Map rowData = new HashMap<>(); + for (int i = 1; i <= columnCount; ++i) { + rowData.put(rmd.getColumnName(i), rs.getObject(i)); + } - /** - * 获取实例 - * @return DruidData实例 - */ - public static DruidData getInstance() { - druidData = new DruidData(); - return druidData; - } - - /** - * 获取distinct server ip - * @return ArrayList ip列表 - */ - public static ArrayList getServerIpList(AvaticaStatement statement, String timeFilter) { - Long startQueryIpLIstTime = System.currentTimeMillis(); - ArrayList serverIps = new ArrayList(); - String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME - + " FROM " + ApplicationConfig.DRUID_TABLE - + " WHERE " + timeFilter - + " LIMIT 200";// FOR TEST - try{ - ResultSet resultSet = DruidUtils.executeQuery(statement,sql); - while(resultSet.next()){ - String ip = resultSet.getString(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); - serverIps.add(ip); + String ip = (String) rowData.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); + if(!ipList.contains(ip)){ + ipList.add(ip); + List> ipData = new ArrayList<>(); + allIpDataList.put(ip, ipData); + } + rowData.remove(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); + allIpDataList.get(ip).add(rowData); } - } catch (Exception e){ - e.printStackTrace(); + } catch (Exception ex) { + ex.printStackTrace(); } - Long endQueryIpListTime = System.currentTimeMillis(); - LOG.info("性能测试:ip list查询耗时——"+(endQueryIpListTime-startQueryIpLIstTime)); - - return serverIps; - } - - public static List getServerIpList(List> dataFromDruid) { - List serverIps = new ArrayList<>(); - List collect = dataFromDruid.stream().map(i -> i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).toString()) - .collect(Collectors.toList()); - serverIps = collect.stream().distinct().collect(Collectors.toList()); - return serverIps; - } - - /** - * 从Druid读取目标IP相关数据 - * @param ipList ip列表 - * @return 数据库读取结果 - */ - public static List> readFromDruid(AvaticaConnection connection, AvaticaStatement statement, List ipList, String timeFilter){ - List> rsList = null; - ipList = ipList.stream().map( ip -> "\'"+ip+"\'").collect(Collectors.toList()); - String ipString = "(" + StringUtils.join(ipList, ",").toString() + ")"; - String sql = "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME - + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME - + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE - + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " FROM " + ApplicationConfig.DRUID_TABLE - + " WHERE " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME - + " IN " + ipString - + " AND " + timeFilter; - try{ - ResultSet resultSet = DruidUtils.executeQuery(statement, sql); - ResultSetToListService service = new ResultSetToListServiceImp(); - rsList = service.selectAll(resultSet); - } catch (Exception e){ - e.printStackTrace(); - } - return rsList; - } - - public static List> readFromDruid(String sql, AvaticaStatement statement){ - List> rsList = null; - try{ - ResultSet resultSet = DruidUtils.executeQuery(statement, sql); - ResultSetToListService service = new ResultSetToListServiceImp(); - rsList = service.selectAll(resultSet); - } catch (Exception e){ - e.printStackTrace(); - } - return rsList; - } - - public static List> getBatchData(List>allData, List ipList){ - ArrayList> rsList = new ArrayList<>(); - for(Map record: allData){ - if(ipList.contains(record.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME))){ - rsList.add(record); - } - } - return rsList; - } - - - /** - * 从数据库读取结果中筛选指定ip的指定攻击类型的数据 - * @param allData 数据库读取结果 - * @param ip 指定ip - * @param attackType 指定攻击类型 - * @return 筛选结果 - */ - public static List> getTimeSeriesData(List> allData, String ip, String attackType){ - List> rsList = new ArrayList<>(); - try{ - rsList = allData.stream(). - filter(i->((i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).equals(ip)) - )&&(i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType))) - .collect(Collectors.toList()); - } catch (NullPointerException e){ - } - return rsList; + return allIpDataList; } /** @@ -180,7 +90,7 @@ public class DruidData { default: LOG.warn("没有设置Druid数据读取方式"); } - return Tuple.of(maxTime, minTime); + return Tuple.of(minTime, maxTime); } private static long getCurrentDay(int bias) { @@ -198,32 +108,46 @@ public class DruidData { return getCurrentDay(0); } - /** - * 关闭当前DruidData - */ - public void closeConn(){ - try { - DruidUtils.closeConnection(); - } catch (SQLException exception) { - exception.printStackTrace(); - } - } - public static String getDruidQuerySql(Long originBeginTime, int currentPart, long timeGrad){ + public static String getDruidQuerySql(List attackTypeList, Long originBeginTime, int currentPart, long timeGrad){ long startTime = originBeginTime + currentPart * timeGrad; long endTime = originBeginTime + (currentPart+1) * timeGrad; + attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList()); + String attackList = "(" + StringUtils.join(attackTypeList, ",") + ")"; String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + " >= MILLIS_TO_TIMESTAMP(" + startTime + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + " < MILLIS_TO_TIMESTAMP(" + endTime + ")"; - String sql = "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + " FROM " + ApplicationConfig.DRUID_TABLE - + " WHERE " + timeFilter; // FOR TEST - return sql; + + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + + " IN " + attackList + + " AND " + timeFilter; } + /** + * 描述:分割Map + * @param map 原始数据 + * @param pageSize 每个map数量 + * @return ListList> + */ + public static List> splitMap(Map map, int pageSize){ + if(map == null || map.isEmpty()){ + return Collections.emptyList(); + } + List> newList = new ArrayList<>(); + int j = 0; + for(K k :map.keySet()){ + if(j%pageSize == 0) { + newList.add(new HashMap<>()); + } + newList.get(newList.size()-1).put(k, map.get(k)); + j++; + } + return newList; + } } diff --git a/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java b/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java deleted file mode 100644 index 7867353..0000000 --- a/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java +++ /dev/null @@ -1,44 +0,0 @@ -package cn.mesalab.dao.Impl; - -import cn.mesalab.dao.ResultSetToListService; - -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author yjy - * @version 1.0 - * @date 2021/7/24 4:29 下午 - */ -public class ResultSetToListServiceImp implements ResultSetToListService { - - /** - * SELECT 查询记录以List结构返回,每一个元素是一条记录 - * 每条记录保存在Map里面,String类型指字段名字,Object对应字段值 - * - * @param rs - * @return List> - */ - @Override - public List> selectAll(ResultSet rs) { - List> list = new ArrayList>(); - try { - ResultSetMetaData rmd = rs.getMetaData(); - int columnCount = rmd.getColumnCount(); - while (rs.next()) { - Map rowData = new HashMap(); - for (int i = 1; i <= columnCount; ++i) { - rowData.put(rmd.getColumnName(i), rs.getObject(i)); - } - list.add(rowData); - } - } catch (Exception ex) { - ex.printStackTrace(); - } - return list; - } -} \ No newline at end of file diff --git a/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java index 7a1a936..fb64b81 100644 --- a/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java +++ b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java @@ -6,7 +6,8 @@ import org.apache.calcite.avatica.AvaticaStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -16,11 +17,11 @@ import java.util.concurrent.CountDownLatch; * @version 1.0 * @date 2021/8/3 8:10 下午 */ -public class ReadHistoricalDruidData implements Callable>> { +public class ReadHistoricalDruidData implements Callable>>> { private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class); - private String sql; - private CountDownLatch countDownLatch; + private final String sql; + private final CountDownLatch countDownLatch; public ReadHistoricalDruidData( String sql, @@ -31,15 +32,14 @@ public class ReadHistoricalDruidData implements Callable> call() { - ArrayList> resultData = new ArrayList<>(); + public Map>> call() { + Map>> resultData = new HashMap<>(); try { long start = System.currentTimeMillis(); AvaticaConnection connection = DruidUtils.getConn(); AvaticaStatement stat = connection.createStatement(); - resultData.addAll(DruidData.readFromDruid(sql, stat)); - - + Map>> readFromDruid = DruidData.readFromDruid(sql, stat); + resultData.putAll(readFromDruid); long end = System.currentTimeMillis(); LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start)); diff --git a/src/main/java/cn/mesalab/dao/ResultSetToListService.java b/src/main/java/cn/mesalab/dao/ResultSetToListService.java deleted file mode 100644 index 103e330..0000000 --- a/src/main/java/cn/mesalab/dao/ResultSetToListService.java +++ /dev/null @@ -1,24 +0,0 @@ -package cn.mesalab.dao; - -import java.sql.ResultSet; -import java.util.List; -import java.util.Map; - - -/** - * @author yjy - * @version 1.0 - * @date 2021/7/24 4:27 下午 - */ -public interface ResultSetToListService { - /** - * SELECT * FROM websites - * 查询所有记录,以List返回 - * list对象的每一个元素都是一条记录 - * 每条记录保存在Map里面,String类型指字段名字,Object对应字段值 - * - * @param rs - * @return List> - */ - public List> selectAll(ResultSet rs); -} \ No newline at end of file diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java index a911f03..7bc5c4e 100644 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -3,18 +3,14 @@ package cn.mesalab.service; import cn.mesalab.config.ApplicationConfig; import cn.mesalab.dao.DruidData; import cn.mesalab.dao.ReadHistoricalDruidData; -import cn.mesalab.utils.DruidUtils; import cn.mesalab.utils.HbaseUtils; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.vavr.Tuple2; -import org.apache.calcite.avatica.AvaticaConnection; -import org.apache.calcite.avatica.AvaticaStatement; +import org.apache.commons.collections.ListUtils; import org.apache.hadoop.hbase.client.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.SQLException; import java.util.*; import java.util.concurrent.*; @@ -30,21 +26,16 @@ public class BaselineGeneration { private static final Table hbaseTable = HbaseUtils.getInstance().getHbaseTable(); private static final List ATTACK_TYPE_LIST = Arrays.asList( - ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD, - ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD, - ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD, - ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL - ); + ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD +// ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD, +// ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD, +// ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL + ); private static final Integer BASELINE_POINT_NUM = ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); private static final Tuple2 START_END_TIMES = DruidData.getTimeLimit(); - private static final String TIME_FILTER = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " >= MILLIS_TO_TIMESTAMP(" + START_END_TIMES._2 - + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " < MILLIS_TO_TIMESTAMP(" + START_END_TIMES._1 + ")"; - - private static final ArrayList> allFromDruid = new ArrayList<>(); + private static final Map>> allFromDruid = new HashMap<>(); /** * 程序执行 @@ -85,25 +76,27 @@ public class BaselineGeneration { TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory, new ThreadPoolExecutor.AbortPolicy()); long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR; - int threadPoolNum = (int) ((START_END_TIMES._1-START_END_TIMES._2)/timeGrad); - ArrayList>>> resultList = new ArrayList<>(); + int threadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad); + ArrayList>>>> resultList = new ArrayList<>(); CountDownLatch loadDataCountDownLatch = new CountDownLatch(threadPoolNum); - for (int i = 0; i < threadNum; i++) { - String sql = DruidData.getDruidQuerySql(START_END_TIMES._1, i, timeGrad); + for (int i = 0; i < threadPoolNum; i++) { + String sql = DruidData.getDruidQuerySql(ATTACK_TYPE_LIST, START_END_TIMES._1, i, timeGrad); ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData( sql, loadDataCountDownLatch ); - Future>> future = loadDataExecutor.submit(readHistoricalDruidData); + Future>>> future = loadDataExecutor.submit(readHistoricalDruidData); resultList.add(future); } loadDataExecutor.shutdown(); loadDataCountDownLatch.await(); - for(Future>> future: resultList){ + for(Future>>> future: resultList){ try { - if(future.get()!=null){ - allFromDruid.addAll(future.get()); + Map>> queryBatchIpData = future.get(); + if(queryBatchIpData !=null){ + queryBatchIpData.forEach((ip, data)-> + allFromDruid.merge(ip, data, ListUtils::union)); }else{ LOG.error("future.get()未获取到结果"); } @@ -115,8 +108,6 @@ public class BaselineGeneration { LOG.info("Druid 加载数据共耗时:"+(last-start)); // BaseLine生成 - // 获取IP列表 - List destinationIps = DruidData.getServerIpList(allFromDruid); ThreadFactory generationThreadFactory = new ThreadFactoryBuilder() .setNameFormat("baseline-generate-%d").build(); ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor( @@ -124,21 +115,19 @@ public class BaselineGeneration { TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory, new ThreadPoolExecutor.AbortPolicy()); - LOG.info("共查询到服务端ip " +destinationIps.size() + " 个"); + LOG.info("共查询到服务端ip " +allFromDruid.size() + " 个"); LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); - // 分批进行IP baseline生成和处理 - List> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); - CountDownLatch generateCountDownLatch = new CountDownLatch(threadPoolNum); - for (List batchIps: batchIpLists){ - if(batchIps.size()>0){ + + List>>> batchDruidDataLists = DruidData.splitMap(allFromDruid, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); + CountDownLatch generateCountDownLatch = new CountDownLatch(batchDruidDataLists.size()); + for (Map>>batchDruidData: batchDruidDataLists){ + if(batchDruidData.size()>0){ BaselineSingleThread baselineSingleThread = new BaselineSingleThread( - batchIps, hbaseTable, ATTACK_TYPE_LIST, BASELINE_POINT_NUM, - TIME_FILTER, - allFromDruid, + batchDruidData, generateCountDownLatch ); generationExecutor.execute(baselineSingleThread); diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index e5ce54c..a1cdfc2 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -1,12 +1,9 @@ package cn.mesalab.service; import cn.mesalab.config.ApplicationConfig; -import cn.mesalab.dao.DruidData; import cn.mesalab.service.algorithm.KalmanFilter; import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.SeriesUtils; -import org.apache.calcite.avatica.AvaticaConnection; -import org.apache.calcite.avatica.AvaticaStatement; import org.apache.commons.math3.stat.StatUtils; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; @@ -29,43 +26,38 @@ import java.util.stream.Collectors; public class BaselineSingleThread extends Thread { private static final Logger LOG = LoggerFactory.getLogger(BaselineSingleThread.class); - private List ipList; - private Table hbaseTable; - private List attackTypeList; - private Integer BASELINE_POINT_NUM; - private String timeFilter; - private List> batchDruidData; - private List> historicalData; - private CountDownLatch countDownLatch; + private final Table hbaseTable; + private final List attackTypeList; + private final Integer historicalPointNum; + private final Map>> batchDruidData; + private final CountDownLatch countDownLatch; public BaselineSingleThread( - List batchIpList, Table hbaseTable, List attackTypeList, - Integer BASELINE_POINT_NUM, - String timeFilter, - List> historicalData, + Integer baselinePointNum, + Map>> batchDruidData, CountDownLatch countDownLatch ){ - this.ipList = batchIpList; this.hbaseTable = hbaseTable; this.attackTypeList = attackTypeList; - this.BASELINE_POINT_NUM = BASELINE_POINT_NUM; - this.timeFilter = timeFilter; - this.historicalData = historicalData; + this.historicalPointNum = baselinePointNum; + this.batchDruidData = batchDruidData; this.countDownLatch = countDownLatch; } @Override public void run(){ - batchDruidData = DruidData.getBatchData(historicalData, ipList); - List putList = new ArrayList<>(); for(String attackType: attackTypeList){ - for(String ip: ipList){ - int[] ipBaseline = generateSingleIpBaseline(ip, attackType); + for(String ip: batchDruidData.keySet()){ + // 筛选指定ip指定攻击类型的数据 + List> ipDruidData = batchDruidData.get(ip).stream() + .filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList()); + // baseline生成 + int[] ipBaseline = generateSingleIpBaseline(ipDruidData); if (ipBaseline!= null){ - putList = HbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); + HbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); } } } @@ -76,46 +68,40 @@ public class BaselineSingleThread extends Thread { e.printStackTrace(); } finally { countDownLatch.countDown(); - LOG.info("本线程读取完毕,剩余线程数量:" + countDownLatch.getCount()); + LOG.info("本线程处理完毕,剩余线程数量:" + countDownLatch.getCount()); } } /** * 单ip baseline生成逻辑 - * @param ip ip - * @param attackType 攻击类型 * @return baseline序列,长度为 60/HISTORICAL_GRAD*24 */ - private int[] generateSingleIpBaseline(String ip, String attackType){ - // 查询 - List> originSeries = DruidData.getTimeSeriesData(batchDruidData, ip, attackType); - - if (originSeries.size()==0){ + private int[] generateSingleIpBaseline(List> ipDruidData){ + if (ipDruidData.size()==0){ return null; } // 时间序列缺失值补0 - List> completSeries = SeriesUtils.complementSeries(originSeries); + List> completSeries = SeriesUtils.complementSeries(ipDruidData); - int[] baselineArr = new int[BASELINE_POINT_NUM]; + int[] baselineArr = new int[historicalPointNum]; Listseries = completSeries.stream().map( i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); // 判断ip出现频率 - if(originSeries.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){ + if(ipDruidData.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){ // 高频率 double percentile = StatUtils.percentile(series.stream().mapToDouble(Double::valueOf).toArray(), ApplicationConfig.BASELINE_SPARSE_FILL_PERCENTILE); Arrays.fill(baselineArr, (int)percentile); baselineArr = baselineFunction(series); - } else { // 判断周期性 if (SeriesUtils.isPeriod(series)){ baselineArr = baselineFunction(series); } else { int ipPercentile = SeriesUtils.percentile( - originSeries.stream().map(i -> + ipDruidData.stream().map(i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()), ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); Arrays.fill(baselineArr, ipPercentile); @@ -135,11 +121,11 @@ public class BaselineSingleThread extends Thread { switch (ApplicationConfig.BASELINE_FUNCTION){ case "KalmanFilter": KalmanFilter kalmanFilter = new KalmanFilter(); - kalmanFilter.forcast(timeSeries, BASELINE_POINT_NUM); + kalmanFilter.forcast(timeSeries, historicalPointNum); result = kalmanFilter.getForecastSeries().stream().mapToInt(Integer::valueOf).toArray(); break; default: - result = timeSeries.subList(0, BASELINE_POINT_NUM).stream().mapToInt(Integer::valueOf).toArray(); + result = timeSeries.subList(0, historicalPointNum).stream().mapToInt(Integer::valueOf).toArray(); } return result; } diff --git a/src/main/java/cn/mesalab/utils/SeriesUtils.java b/src/main/java/cn/mesalab/utils/SeriesUtils.java index b01678e..7d67f87 100644 --- a/src/main/java/cn/mesalab/utils/SeriesUtils.java +++ b/src/main/java/cn/mesalab/utils/SeriesUtils.java @@ -50,9 +50,9 @@ public class SeriesUtils { * 时序数据补齐 */ public static List> complementSeries(List> originSeries){ - LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._2), TimeZone + LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(DruidData.getTimeLimit()._1), TimeZone .getDefault().toZoneId()); - LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._1), TimeZone + LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(DruidData.getTimeLimit()._2), TimeZone .getDefault().toZoneId()); List dateList = completionDate(startTime, endTime); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index fb82400..419cee5 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -26,6 +26,7 @@ read.druid.time.limit.type=1 read.druid.min.time=1625414400000 #06-01 #read.druid.min.time=1622476800000 +#07-08 read.druid.max.time=1625673600000 #读取过去N天数据,最小值为3天(需要判断周期性)