diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index b55042e..1558de5 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -59,7 +59,11 @@ public class ApplicationConfig { public static final Double BASELINE_KALMAN_P = ConfigUtils.getDoubleProperty("baseline.kalman.p"); public static final Double BASELINE_KALMAN_M = ConfigUtils.getDoubleProperty("baseline.kalman.m"); - public static final Integer THREAD_POOL_NUM = ConfigUtils.getIntProperty("thread.pool.num"); + public static final Integer ALL_PARTITION_NUM = ConfigUtils.getIntProperty("all.partition.num"); + public static final Integer MAX_POOL_SIZE = ConfigUtils.getIntProperty("max.pool.size"); + public static final Integer CORE_POOL_SIZE = ConfigUtils.getIntProperty("core.pool.size"); + + public static final Integer PARTITION_NUM_MAX = ConfigUtils.getIntProperty("druid.partition.num.max"); public static final Integer DRUID_STATEMENT_QUERY_TIMEOUT = ConfigUtils.getIntProperty("druid.statement.query.timeout"); diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java index d15be5d..1bd0e4d 100644 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -2,11 +2,8 @@ package cn.mesalab.service; import cn.mesalab.config.ApplicationConfig; import cn.mesalab.dao.DruidData; -import cn.mesalab.utils.HbaseUtils; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.vavr.Tuple2; -import org.apache.commons.collections.ListUtils; -import org.apache.hadoop.hbase.client.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,9 +29,9 @@ public class BaselineGeneration { ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); private static final Tuple2 START_END_TIMES = DruidData.getTimeLimit(); - private static final int threadPoolNum = ApplicationConfig.THREAD_POOL_NUM; + private static final int allPartitionNum = ApplicationConfig.ALL_PARTITION_NUM; // 每个线程读取数据所覆盖的partition_num个数 - private static final int batchPartitionRange = (int) Math.ceil(ApplicationConfig.PARTITION_NUM_MAX /(double)threadPoolNum); + private static final int batchPartitionRange = (int) Math.ceil(ApplicationConfig.PARTITION_NUM_MAX /(double) allPartitionNum); /** * 程序执行 @@ -54,16 +51,16 @@ public class BaselineGeneration { } private static void baselineGeneration() throws InterruptedException { - CountDownLatch generateCountDownLatch = new CountDownLatch(threadPoolNum); + CountDownLatch generateCountDownLatch = new CountDownLatch(allPartitionNum); ThreadFactory generationThreadFactory = new ThreadFactoryBuilder() .setNameFormat("baseline-generate-%d").build(); ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor( - threadPoolNum, threadPoolNum, 0L, + ApplicationConfig.CORE_POOL_SIZE, ApplicationConfig.MAX_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory, new ThreadPoolExecutor.AbortPolicy()); - for(int threadCount = 0; threadCount