diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index 1e94280..300be5d 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -49,7 +49,7 @@ public class ApplicationConfig { public static final Integer BASELINE_GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("baseline.generate.batch.size"); public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour"); - public static final Integer THREAD_MAX_NUM = ConfigUtils.getIntProperty("thread.max.num"); + public static final Integer THREAD_POOL_NUM = ConfigUtils.getIntProperty("thread.pool.num"); // http config public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout"); diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java index 9c73b5f..0ba8d55 100644 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -35,7 +35,6 @@ public class BaselineGeneration { private static final Tuple2 START_END_TIMES = DruidData.getTimeLimit(); private static final Map>> allFromDruid = new HashMap<>(); - private static int threadNum = ApplicationConfig.THREAD_MAX_NUM; /** * 程序执行 @@ -65,19 +64,19 @@ public class BaselineGeneration { long start = System.currentTimeMillis(); long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR; - int threadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad); + int loadDataThreadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad); ArrayList>>>> resultList = new ArrayList<>(); - CountDownLatch loadDataCountDownLatch = new CountDownLatch(threadPoolNum); + CountDownLatch loadDataCountDownLatch = new CountDownLatch(loadDataThreadPoolNum); ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder() .setNameFormat("baseline-load-data-%d").build(); ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor( - threadNum, threadNum, 0L, + loadDataThreadPoolNum, loadDataThreadPoolNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory, new ThreadPoolExecutor.AbortPolicy()); // 按ip数分区 - for (int i = 0; i < threadPoolNum; i++) { + for (int i = 0; i < loadDataThreadPoolNum; i++) { String sql = DruidData.getDruidQuerySql(ATTACK_TYPE_LIST, START_END_TIMES._1, i, timeGrad); ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData( sql, @@ -115,17 +114,19 @@ public class BaselineGeneration { * @throws InterruptedException */ private static void baselineGenration() throws InterruptedException { + List>>> batchDruidDataLists = DruidData.splitMap(allFromDruid, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); + int generationThreadPoolNum = batchDruidDataLists.size(); + CountDownLatch generateCountDownLatch = new CountDownLatch(generationThreadPoolNum); + ThreadFactory generationThreadFactory = new ThreadFactoryBuilder() .setNameFormat("baseline-generate-%d").build(); ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor( - threadNum, threadNum, 0L, + generationThreadPoolNum, generationThreadPoolNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory, new ThreadPoolExecutor.AbortPolicy()); LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); - List>>> batchDruidDataLists = DruidData.splitMap(allFromDruid, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); - CountDownLatch generateCountDownLatch = new CountDownLatch(batchDruidDataLists.size()); for (Map>>batchDruidData: batchDruidDataLists){ if(batchDruidData.size()>0){ BaselineSingleThread baselineSingleThread = new BaselineSingleThread( diff --git a/src/main/java/cn/mesalab/utils/ExecutorThreadPool.java b/src/main/java/cn/mesalab/utils/ExecutorThreadPool.java deleted file mode 100644 index bb0e8f5..0000000 --- a/src/main/java/cn/mesalab/utils/ExecutorThreadPool.java +++ /dev/null @@ -1,67 +0,0 @@ -package cn.mesalab.utils; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.omg.PortableInterceptor.INACTIVE; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.*; - -public class ExecutorThreadPool { - private static final Logger LOG = LoggerFactory.getLogger(ExecutorThreadPool.class); - private static ExecutorService pool = null ; - private static ExecutorThreadPool poolExecutor = null; - private int threadPoolNum; - - static { - getThreadPool(); - } - - public ExecutorThreadPool(Integer threadPoolNum){ - this.threadPoolNum. - } - - private static void getThreadPool(){ - ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() - .setNameFormat("iplearning-application-pool-%d").build(); - pool = new ThreadPoolExecutor(ApplicationConfig.THREAD_POOL_NUMBER, ApplicationConfig.THREAD_POOL_NUMBER, - 0L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); - } - - public static ExecutorThreadPool getInstance(){ - if (null == poolExecutor){ - poolExecutor = new ExecutorThreadPool(); - } - return poolExecutor; - } - - public void executor(Runnable command){ - pool.execute(command); - } - - @Deprecated - public void awaitThreadTask(){ - try { - while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) { - LOG.warn("线程池没有关闭"); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - public void shutdown(){ - pool.shutdown(); - } - - @Deprecated - public static Long getThreadNumber(){ - String name = Thread.currentThread().getName(); - String[] split = name.split("-"); - return Long.parseLong(split[3]); - } - - - -} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 419cee5..a49cadd 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -53,7 +53,7 @@ baseline.kalman.r=4 # FOR TEST baseline.generate.batch.size=100 druid.read.batch.time.grad.hour=4 -thread.max.num=10 +thread.pool.num=10 # http client配置 diff --git a/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java b/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java deleted file mode 100644 index a29cd3e..0000000 --- a/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java +++ /dev/null @@ -1,71 +0,0 @@ -package cn.mesalab.utils; - -import cn.mesalab.config.ApplicationConfig; -import cn.mesalab.dao.DruidData; -import com.google.common.collect.Maps; -import com.zdjizhi.utils.JsonMapper; -import sun.net.util.URLUtil; - -import java.net.URL; -import java.util.Map; - -/** - * @author yjy - * @version 1.0 - * @date 2021/8/3 4:43 下午 - */ -public class HttpClientUtilsTest { - private static HttpClientUtils httpClientUtils = new HttpClientUtils(); - - public static void main(String[] args) { - executeHttpPost("select * from top_server_ip_test_log limit 10"); - } - - private static Map executeHttpPost(String sql){ - String queryUrl = "http://192.168.44.12:8082/druid/v2/sql"; - DruidQueryParam druidQueryParam = getDruidQueryParam(sql); - int socketTimeout = ApplicationConfig.HTTP_RESPONSE_TIMEOUT; - Map stringStringMap = httpClientUtils.httpPost(queryUrl, JsonMapper.toJsonString(druidQueryParam), socketTimeout); - System.out.println(stringStringMap.toString()); - return stringStringMap; - } - - public static DruidQueryParam getDruidQueryParam(String sql) { - DruidQueryParam druidQueryParam = new DruidQueryParam(); - druidQueryParam.setQuery(sql); - druidQueryParam.getContext().put("skipEmptyBuckets", "true"); - druidQueryParam.setResultFormat("object"); - return druidQueryParam; - } -} - - -class DruidQueryParam { - private String query; - private Map context = Maps.newHashMap(); - private String resultFormat; - - public String getQuery() { - return query; - } - - public void setQuery(String query) { - this.query = query; - } - - public Map getContext() { - return context; - } - - public void setContext(Map context) { - this.context = context; - } - - public String getResultFormat() { - return resultFormat; - } - - public void setResultFormat(String resultFormat) { - this.resultFormat = resultFormat; - } -} \ No newline at end of file