diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index 68b8c50..531f0ff 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -50,6 +50,8 @@ public class ApplicationConfig { public static final Integer LOG_WRITE_COUNT = ConfigUtils.getIntProperty("log.write.count"); public static final Integer GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("generate.batch.size"); + public static final Integer THREAD_MAX_NUM = ConfigUtils.getIntProperty("thread.max.num"); + // http config diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java index 8540bf0..32ffefc 100644 --- a/src/main/java/cn/mesalab/dao/DruidData.java +++ b/src/main/java/cn/mesalab/dao/DruidData.java @@ -13,10 +13,7 @@ import org.slf4j.LoggerFactory; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; @@ -87,6 +84,14 @@ public class DruidData { return serverIps; } + public static List getServerIpList(List> dataFromDruid) { + List serverIps = new ArrayList<>(); + List collect = dataFromDruid.stream().map(i -> i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).toString()) + .collect(Collectors.toList()); + serverIps = collect.stream().distinct().collect(Collectors.toList()); + return serverIps; + } + /** * 从Druid读取目标IP相关数据 * @param ipList ip列表 @@ -114,6 +119,29 @@ public class DruidData { return rsList; } + public static List> readFromDruid(String sql, AvaticaStatement statement){ + List> rsList = null; + try{ + ResultSet resultSet = DruidUtils.executeQuery(statement, sql); + ResultSetToListService service = new ResultSetToListServiceImp(); + rsList = service.selectAll(resultSet); + } catch (Exception e){ + e.printStackTrace(); + } + return rsList; + } + + public static List> getBatchData(List>allData, List ipList){ + ArrayList> rsList = new ArrayList<>(); + for(Map record: allData){ + if(ipList.contains(record.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME))){ + rsList.add(record); + } + } + return rsList; + } + + /** * 从数据库读取结果中筛选指定ip的指定攻击类型的数据 * @param allData 数据库读取结果 @@ -180,4 +208,22 @@ public class DruidData { exception.printStackTrace(); } } + + public static String getDruidQuerySql(Long originBeginTime, int currentPart, long timeGrad){ + long startTime = originBeginTime + currentPart * timeGrad; + long endTime = originBeginTime + (currentPart+1) * timeGrad; + String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " >= MILLIS_TO_TIMESTAMP(" + startTime + + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " < MILLIS_TO_TIMESTAMP(" + endTime + ")"; + + String sql = "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE + + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " FROM " + ApplicationConfig.DRUID_TABLE + + " WHERE " + timeFilter; // FOR TEST + return sql; + } + } diff --git a/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java new file mode 100644 index 0000000..5b15ece --- /dev/null +++ b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java @@ -0,0 +1,46 @@ +package cn.mesalab.dao; + +import org.apache.calcite.avatica.AvaticaStatement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; + +/** + * @author yjy + * @version 1.0 + * @date 2021/8/3 8:10 下午 + */ +public class ReadHistoricalDruidData implements Callable { + private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class); + + private String sql; + private AvaticaStatement statement; + + public ReadHistoricalDruidData( + String sql, + AvaticaStatement statement + ){ + this.sql = sql; + this.statement = statement; + } + + @Override + public ArrayList> call() { + ArrayList> resultData = new ArrayList<>(); + try { + long start = System.currentTimeMillis(); + + resultData.addAll(DruidData.readFromDruid(sql, statement)); + long end = System.currentTimeMillis(); + LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start)); + } catch (Exception e) { + e.printStackTrace(); + } + return resultData; + } + +} diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java index d46b45e..1f2acc7 100644 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -2,10 +2,12 @@ package cn.mesalab.service; import cn.mesalab.config.ApplicationConfig; import cn.mesalab.dao.DruidData; +import cn.mesalab.dao.ReadHistoricalDruidData; import cn.mesalab.utils.DruidUtils; import cn.mesalab.utils.HbaseUtils; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.vavr.Tuple2; import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaStatement; import org.apache.hadoop.hbase.client.Table; @@ -47,10 +49,13 @@ public class BaselineGeneration { private static final Integer BASELINE_POINT_NUM = ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); + private static Tuple2 startEndTimes = DruidData.getTimeLimit(); private static String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " >= MILLIS_TO_TIMESTAMP(" + DruidData.getTimeLimit()._2 + + " >= MILLIS_TO_TIMESTAMP(" + startEndTimes._2 + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " < MILLIS_TO_TIMESTAMP(" + DruidData.getTimeLimit()._1 + ")"; + + " < MILLIS_TO_TIMESTAMP(" + startEndTimes._1 + ")"; + + private static ArrayList> allFromDruid = new ArrayList<>(); /** * 程序执行 @@ -80,23 +85,47 @@ public class BaselineGeneration { * @throws InterruptedException */ private void generateBaselinesThread() throws InterruptedException { - int threadNum = Runtime.getRuntime().availableProcessors(); + int threadNum = ApplicationConfig.THREAD_MAX_NUM; - ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() - .setNameFormat("baseline-demo-%d").build(); - - // 创建线程池 - ThreadPoolExecutor executor = new ThreadPoolExecutor( - threadNum, - threadNum, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(1024), - namedThreadFactory, + // 数据读取 + LOG.info("Druid 开始读取数据"); + long start = System.currentTimeMillis(); + // allFromDruid = DruidData.readAllFromDruid(druidConn, druidStatement, timeFilter); + ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("baseline-load-data-%d").build(); + ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor( + threadNum, threadNum, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory, new ThreadPoolExecutor.AbortPolicy()); + long timeGrad = (startEndTimes._1 - startEndTimes._2)/threadNum; + for (int i = 0; i < threadNum; i++) { + String sql = DruidData.getDruidQuerySql(startEndTimes._1, i, timeGrad); + ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData( + sql, + druidStatement + ); + Future>> future = loadDataExecutor.submit(readHistoricalDruidData); + try { + allFromDruid.addAll(future.get()); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + long last = System.currentTimeMillis(); + LOG.info("Druid 加载数据共耗时:"+(last-start)); + loadDataExecutor.shutdown(); + loadDataExecutor.awaitTermination(10L, TimeUnit.HOURS); - // IP列表获取 - ArrayList destinationIps = DruidData.getServerIpList(druidStatement, timeFilter); + + // BaseLine生成 + // 获取IP列表 + List destinationIps = DruidData.getServerIpList(allFromDruid); + ThreadFactory generationThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("baseline-load-data-%d").build(); + ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor( + threadNum, threadNum, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory, + new ThreadPoolExecutor.AbortPolicy()); LOG.info("共查询到服务端ip " +destinationIps.size() + " 个"); LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE); @@ -106,21 +135,22 @@ public class BaselineGeneration { for (List batchIps: batchIpLists){ if(batchIps.size()>0){ - BaselineSingleThread testForInsider = new BaselineSingleThread( + BaselineSingleThread baselineSingleThread = new BaselineSingleThread( batchIps, druidConn, druidStatement, hbaseTable, attackTypeList, BASELINE_POINT_NUM, - timeFilter + timeFilter, + allFromDruid ); - executor.execute(testForInsider); + generationExecutor.execute(baselineSingleThread); } } - executor.shutdown(); - executor.awaitTermination(10L, TimeUnit.HOURS); + generationExecutor.shutdown(); + generationExecutor.awaitTermination(10L, TimeUnit.HOURS); } } diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index 988355a..19bdcd5 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -36,6 +36,7 @@ public class BaselineSingleThread extends Thread { private Integer BASELINE_POINT_NUM; private String timeFilter; private List> batchDruidData; + private List> historicalData; public BaselineSingleThread( List batchIpList, @@ -44,7 +45,8 @@ public class BaselineSingleThread extends Thread { Table hbaseTable, List attackTypeList, Integer BASELINE_POINT_NUM, - String timeFilter + String timeFilter, + List> historicalData ){ this.ipList = batchIpList; this.druidConn = druidConn; @@ -53,11 +55,13 @@ public class BaselineSingleThread extends Thread { this.attackTypeList = attackTypeList; this.BASELINE_POINT_NUM = BASELINE_POINT_NUM; this.timeFilter = timeFilter; + this.historicalData = historicalData; } @Override public void run(){ - batchDruidData = DruidData.readFromDruid(druidConn, druidStatement, ipList, timeFilter); +// batchDruidData = DruidData.readFromDruid(druidConn, druidStatement, ipList, timeFilter); + batchDruidData = DruidData.getBatchData(historicalData, ipList); List putList = new ArrayList<>(); for(String attackType: attackTypeList){ diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 42c9c32..1ee2d0c 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -23,14 +23,16 @@ hbase.zookeeper.client.port=2181 #读取druid时间范围方式,0:读取默认范围read.druid.time.range天数;1:指定时间范围 read.druid.time.limit.type=1 #07-01 -read.druid.min.time=1625068800000 +read.druid.min.time=1625414400000 #06-01 #read.druid.min.time=1622476800000 read.druid.max.time=1625673600000 +thread.max.num=5 + #读取过去N天数据,最小值为3天(需要判断周期性) -read.historical.days=7 +read.historical.days=3 #历史数据汇聚粒度为10分钟 historical.grad=10 #baseline生成方法