diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index 97b8322..dab7c8b 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -31,12 +31,12 @@ public class ApplicationConfig { public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname"); public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname"); - public static final float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold"); - public static final float BASELINE_HISTORICAL_RATIO = ConfigUtils.getFloatProperty("baseline.historical.ratio.threshold"); - public static final float BASELINE_SPARSE_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.historical.sparse.fill.percentile"); + public static final Float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold"); + public static final Float BASELINE_HISTORICAL_RATIO = ConfigUtils.getFloatProperty("baseline.historical.ratio.threshold"); + public static final Float BASELINE_SPARSE_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.historical.sparse.fill.percentile"); public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function"); public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days"); - public static final float BASELINE_RATIONAL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.rational.percentile"); + public static final Float BASELINE_RATIONAL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.rational.percentile"); public static final String HBASE_TABLE = ConfigUtils.getStringProperty("hbase.table"); @@ -47,15 +47,13 @@ public class ApplicationConfig { public static final Double BASELINE_KALMAN_P = ConfigUtils.getDoubleProperty("baseline.kalman.p"); public static final Double BASELINE_KALMAN_R = ConfigUtils.getDoubleProperty("baseline.kalman.r"); - public static final Integer LOG_WRITE_COUNT = ConfigUtils.getIntProperty("log.write.count"); - public static final Integer GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("generate.batch.size"); - + public static final Integer BASELINE_GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("baseline.generate.batch.size"); + public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour"); public static final Integer THREAD_MAX_NUM = ConfigUtils.getIntProperty("thread.max.num"); // http config - public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout"); public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout"); public static final Integer HTTP_CONNECTION_TIMEOUT = ConfigUtils.getIntProperty("http.connection.timeout"); diff --git a/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java index 5b15ece..e483953 100644 --- a/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java +++ b/src/main/java/cn/mesalab/dao/ReadHistoricalDruidData.java @@ -1,5 +1,7 @@ package cn.mesalab.dao; +import cn.mesalab.utils.DruidUtils; +import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -7,14 +9,13 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; /** * @author yjy * @version 1.0 * @date 2021/8/3 8:10 下午 */ -public class ReadHistoricalDruidData implements Callable { +public class ReadHistoricalDruidData implements Callable>> { private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class); private String sql; @@ -33,8 +34,9 @@ public class ReadHistoricalDruidData implements Callable { ArrayList> resultData = new ArrayList<>(); try { long start = System.currentTimeMillis(); - - resultData.addAll(DruidData.readFromDruid(sql, statement)); + AvaticaConnection connection = DruidUtils.getConn(); + AvaticaStatement stat = connection.createStatement(); + resultData.addAll(DruidData.readFromDruid(sql, stat)); long end = System.currentTimeMillis(); LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start)); } catch (Exception e) { diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java index 5a700e8..dc5c3bc 100644 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -27,7 +27,7 @@ import java.util.concurrent.*; public class BaselineGeneration { private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class); - private static AvaticaConnection druidConn = DruidUtils.getConn(); + private static final AvaticaConnection druidConn = DruidUtils.getConn(); private static AvaticaStatement druidStatement; static { @@ -38,9 +38,9 @@ public class BaselineGeneration { } } - private static Table hbaseTable = HbaseUtils.getInstance().getHbaseTable(); + private static final Table hbaseTable = HbaseUtils.getInstance().getHbaseTable(); - private static List attackTypeList = Arrays.asList( + private static final List ATTACK_TYPE_LIST = Arrays.asList( ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD, ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD, ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD, @@ -49,13 +49,13 @@ public class BaselineGeneration { private static final Integer BASELINE_POINT_NUM = ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); - private static Tuple2 startEndTimes = DruidData.getTimeLimit(); - private static String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " >= MILLIS_TO_TIMESTAMP(" + startEndTimes._2 + private static final Tuple2 START_END_TIMES = DruidData.getTimeLimit(); + private static final String TIME_FILTER = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " >= MILLIS_TO_TIMESTAMP(" + START_END_TIMES._2 + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " < MILLIS_TO_TIMESTAMP(" + startEndTimes._1 + ")"; + + " < MILLIS_TO_TIMESTAMP(" + START_END_TIMES._1 + ")"; - private static ArrayList> allFromDruid = new ArrayList<>(); + private static final ArrayList> allFromDruid = new ArrayList<>(); /** * 程序执行 @@ -90,27 +90,35 @@ public class BaselineGeneration { // 数据读取 LOG.info("Druid 开始读取数据"); long start = System.currentTimeMillis(); - // allFromDruid = DruidData.readAllFromDruid(druidConn, druidStatement, timeFilter); ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder() .setNameFormat("baseline-load-data-%d").build(); ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor( threadNum, threadNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory, new ThreadPoolExecutor.AbortPolicy()); - long timeGrad = (startEndTimes._1 - startEndTimes._2)/threadNum; - for (int i = 0; i < threadNum; i++) { - String sql = DruidData.getDruidQuerySql(startEndTimes._1, i, timeGrad); + long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR; + ArrayList>>> resultList = new ArrayList<>(); + for (int i = 0; i < (START_END_TIMES._1-START_END_TIMES._2)/timeGrad; i++) { + String sql = DruidData.getDruidQuerySql(START_END_TIMES._1, i, timeGrad); ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData( sql, druidStatement ); - Future>> future = loadDataExecutor.submit(readHistoricalDruidData); + Future>> future = loadDataExecutor.submit(readHistoricalDruidData); + resultList.add(future); + } + for(Future>> future: resultList){ try { - allFromDruid.addAll(future.get()); + if(future.get()!=null){ + allFromDruid.addAll(future.get()); + }else{ + LOG.error("future.get()未获取到结果"); + } } catch (ExecutionException e) { e.printStackTrace(); } } + long last = System.currentTimeMillis(); LOG.info("Druid 加载数据共耗时:"+(last-start)); loadDataExecutor.shutdown(); @@ -128,10 +136,10 @@ public class BaselineGeneration { new ThreadPoolExecutor.AbortPolicy()); LOG.info("共查询到服务端ip " +destinationIps.size() + " 个"); - LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE); + LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); // 分批进行IP baseline生成和处理 - List> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE); + List> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); for (List batchIps: batchIpLists){ if(batchIps.size()>0){ @@ -140,9 +148,9 @@ public class BaselineGeneration { druidConn, druidStatement, hbaseTable, - attackTypeList, + ATTACK_TYPE_LIST, BASELINE_POINT_NUM, - timeFilter, + TIME_FILTER, allFromDruid ); generationExecutor.execute(baselineSingleThread); diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index 19bdcd5..559eee1 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -74,7 +74,7 @@ public class BaselineSingleThread extends Thread { } try { hbaseTable.put(putList); - LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size()); + LOG.info(" 成功写入Baseline条数共计 " + putList.size()); } catch (IOException e) { e.printStackTrace(); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index b1494c5..8bf913c 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -22,15 +22,12 @@ hbase.zookeeper.client.port=2181 #读取druid时间范围方式,0:读取默认范围read.druid.time.range天数;1:指定时间范围 read.druid.time.limit.type=1 -#07-01 +#07-05 read.druid.min.time=1625414400000 #06-01 #read.druid.min.time=1622476800000 read.druid.max.time=1625673600000 -thread.max.num=5 - - #读取过去N天数据,最小值为3天(需要判断周期性) read.historical.days=3 #历史数据汇聚粒度为10分钟 @@ -52,11 +49,10 @@ baseline.rational.percentile=0.95 baseline.kalman.p=0.000001 baseline.kalman.r=4 - -# 每更新1000个记录打印log -log.write.count=10000 # FOR TEST -generate.batch.size=100 +baseline.generate.batch.size=1000 +druid.read.batch.time.grad.hour=4 +thread.max.num=20 # http client配置