增加核心和最大线程参数设置
This commit is contained in:
@@ -59,7 +59,11 @@ public class ApplicationConfig {
|
|||||||
public static final Double BASELINE_KALMAN_P = ConfigUtils.getDoubleProperty("baseline.kalman.p");
|
public static final Double BASELINE_KALMAN_P = ConfigUtils.getDoubleProperty("baseline.kalman.p");
|
||||||
public static final Double BASELINE_KALMAN_M = ConfigUtils.getDoubleProperty("baseline.kalman.m");
|
public static final Double BASELINE_KALMAN_M = ConfigUtils.getDoubleProperty("baseline.kalman.m");
|
||||||
|
|
||||||
public static final Integer THREAD_POOL_NUM = ConfigUtils.getIntProperty("thread.pool.num");
|
public static final Integer ALL_PARTITION_NUM = ConfigUtils.getIntProperty("all.partition.num");
|
||||||
|
public static final Integer MAX_POOL_SIZE = ConfigUtils.getIntProperty("max.pool.size");
|
||||||
|
public static final Integer CORE_POOL_SIZE = ConfigUtils.getIntProperty("core.pool.size");
|
||||||
|
|
||||||
|
|
||||||
public static final Integer PARTITION_NUM_MAX = ConfigUtils.getIntProperty("druid.partition.num.max");
|
public static final Integer PARTITION_NUM_MAX = ConfigUtils.getIntProperty("druid.partition.num.max");
|
||||||
public static final Integer DRUID_STATEMENT_QUERY_TIMEOUT = ConfigUtils.getIntProperty("druid.statement.query.timeout");
|
public static final Integer DRUID_STATEMENT_QUERY_TIMEOUT = ConfigUtils.getIntProperty("druid.statement.query.timeout");
|
||||||
|
|
||||||
|
|||||||
@@ -2,11 +2,8 @@ package cn.mesalab.service;
|
|||||||
|
|
||||||
import cn.mesalab.config.ApplicationConfig;
|
import cn.mesalab.config.ApplicationConfig;
|
||||||
import cn.mesalab.dao.DruidData;
|
import cn.mesalab.dao.DruidData;
|
||||||
import cn.mesalab.utils.HbaseUtils;
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import io.vavr.Tuple2;
|
import io.vavr.Tuple2;
|
||||||
import org.apache.commons.collections.ListUtils;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -32,9 +29,9 @@ public class BaselineGeneration {
|
|||||||
ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
|
ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
|
||||||
|
|
||||||
private static final Tuple2<Long, Long> START_END_TIMES = DruidData.getTimeLimit();
|
private static final Tuple2<Long, Long> START_END_TIMES = DruidData.getTimeLimit();
|
||||||
private static final int threadPoolNum = ApplicationConfig.THREAD_POOL_NUM;
|
private static final int allPartitionNum = ApplicationConfig.ALL_PARTITION_NUM;
|
||||||
// 每个线程读取数据所覆盖的partition_num个数
|
// 每个线程读取数据所覆盖的partition_num个数
|
||||||
private static final int batchPartitionRange = (int) Math.ceil(ApplicationConfig.PARTITION_NUM_MAX /(double)threadPoolNum);
|
private static final int batchPartitionRange = (int) Math.ceil(ApplicationConfig.PARTITION_NUM_MAX /(double) allPartitionNum);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 程序执行
|
* 程序执行
|
||||||
@@ -54,16 +51,16 @@ public class BaselineGeneration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static void baselineGeneration() throws InterruptedException {
|
private static void baselineGeneration() throws InterruptedException {
|
||||||
CountDownLatch generateCountDownLatch = new CountDownLatch(threadPoolNum);
|
CountDownLatch generateCountDownLatch = new CountDownLatch(allPartitionNum);
|
||||||
|
|
||||||
ThreadFactory generationThreadFactory = new ThreadFactoryBuilder()
|
ThreadFactory generationThreadFactory = new ThreadFactoryBuilder()
|
||||||
.setNameFormat("baseline-generate-%d").build();
|
.setNameFormat("baseline-generate-%d").build();
|
||||||
ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor(
|
ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor(
|
||||||
threadPoolNum, threadPoolNum, 0L,
|
ApplicationConfig.CORE_POOL_SIZE, ApplicationConfig.MAX_POOL_SIZE, 0L,
|
||||||
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory,
|
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory,
|
||||||
new ThreadPoolExecutor.AbortPolicy());
|
new ThreadPoolExecutor.AbortPolicy());
|
||||||
|
|
||||||
for(int threadCount = 0; threadCount<threadPoolNum; threadCount++){
|
for(int threadCount = 0; threadCount< allPartitionNum; threadCount++){
|
||||||
BaselineSingleThread baselineSingleThread = new BaselineSingleThread(
|
BaselineSingleThread baselineSingleThread = new BaselineSingleThread(
|
||||||
ATTACK_TYPE_LIST,
|
ATTACK_TYPE_LIST,
|
||||||
BASELINE_POINT_NUM,
|
BASELINE_POINT_NUM,
|
||||||
|
|||||||
@@ -68,7 +68,9 @@ monitor.frequency.bin.num=100
|
|||||||
##########################################
|
##########################################
|
||||||
################ 并发参数 #################
|
################ 并发参数 #################
|
||||||
##########################################
|
##########################################
|
||||||
thread.pool.num=5
|
all.partition.num=100
|
||||||
|
max.pool.size=1
|
||||||
|
core.pool.size=3
|
||||||
#druid分区字段partition_num的最大值为9999
|
#druid分区字段partition_num的最大值为9999
|
||||||
druid.statement.query.timeout=36000
|
druid.statement.query.timeout=36000
|
||||||
druid.partition.num.max=10000
|
druid.partition.num.max=10000
|
||||||
|
|||||||
Reference in New Issue
Block a user