This commit is contained in:
yinjiangyi
2021-08-05 18:31:32 +08:00
parent d57429c235
commit cf6ea18b91
5 changed files with 11 additions and 148 deletions

View File

@@ -49,7 +49,7 @@ public class ApplicationConfig {
public static final Integer BASELINE_GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("baseline.generate.batch.size");
public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour");
public static final Integer THREAD_MAX_NUM = ConfigUtils.getIntProperty("thread.max.num");
public static final Integer THREAD_POOL_NUM = ConfigUtils.getIntProperty("thread.pool.num");
// http config
public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout");

View File

@@ -35,7 +35,6 @@ public class BaselineGeneration {
private static final Tuple2<Long, Long> START_END_TIMES = DruidData.getTimeLimit();
private static final Map<String, List<Map<String, Object>>> allFromDruid = new HashMap<>();
private static int threadNum = ApplicationConfig.THREAD_MAX_NUM;
/**
* 程序执行
@@ -65,19 +64,19 @@ public class BaselineGeneration {
long start = System.currentTimeMillis();
long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR;
int threadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad);
int loadDataThreadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad);
ArrayList<Future<Map<String, List<Map<String, Object>>>>> resultList = new ArrayList<>();
CountDownLatch loadDataCountDownLatch = new CountDownLatch(threadPoolNum);
CountDownLatch loadDataCountDownLatch = new CountDownLatch(loadDataThreadPoolNum);
ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-load-data-%d").build();
ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor(
threadNum, threadNum, 0L,
loadDataThreadPoolNum, loadDataThreadPoolNum, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
// 按ip数分区
for (int i = 0; i < threadPoolNum; i++) {
for (int i = 0; i < loadDataThreadPoolNum; i++) {
String sql = DruidData.getDruidQuerySql(ATTACK_TYPE_LIST, START_END_TIMES._1, i, timeGrad);
ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData(
sql,
@@ -115,17 +114,19 @@ public class BaselineGeneration {
* @throws InterruptedException
*/
private static void baselineGenration() throws InterruptedException {
List<Map<String, List<Map<String, Object>>>> batchDruidDataLists = DruidData.splitMap(allFromDruid, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
int generationThreadPoolNum = batchDruidDataLists.size();
CountDownLatch generateCountDownLatch = new CountDownLatch(generationThreadPoolNum);
ThreadFactory generationThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-generate-%d").build();
ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor(
threadNum, threadNum, 0L,
generationThreadPoolNum, generationThreadPoolNum, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
List<Map<String, List<Map<String, Object>>>> batchDruidDataLists = DruidData.splitMap(allFromDruid, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
CountDownLatch generateCountDownLatch = new CountDownLatch(batchDruidDataLists.size());
for (Map<String, List<Map<String, Object>>>batchDruidData: batchDruidDataLists){
if(batchDruidData.size()>0){
BaselineSingleThread baselineSingleThread = new BaselineSingleThread(

View File

@@ -1,67 +0,0 @@
package cn.mesalab.utils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.omg.PortableInterceptor.INACTIVE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
public class ExecutorThreadPool {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorThreadPool.class);
private static ExecutorService pool = null ;
private static ExecutorThreadPool poolExecutor = null;
private int threadPoolNum;
static {
getThreadPool();
}
public ExecutorThreadPool(Integer threadPoolNum){
this.threadPoolNum.
}
private static void getThreadPool(){
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("iplearning-application-pool-%d").build();
pool = new ThreadPoolExecutor(ApplicationConfig.THREAD_POOL_NUMBER, ApplicationConfig.THREAD_POOL_NUMBER,
0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
}
public static ExecutorThreadPool getInstance(){
if (null == poolExecutor){
poolExecutor = new ExecutorThreadPool();
}
return poolExecutor;
}
public void executor(Runnable command){
pool.execute(command);
}
@Deprecated
public void awaitThreadTask(){
try {
while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) {
LOG.warn("线程池没有关闭");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void shutdown(){
pool.shutdown();
}
@Deprecated
public static Long getThreadNumber(){
String name = Thread.currentThread().getName();
String[] split = name.split("-");
return Long.parseLong(split[3]);
}
}