线程阻塞

This commit is contained in:
yinjiangyi
2021-08-05 11:30:42 +08:00
parent 7eeaee542d
commit 307f283134
3 changed files with 31 additions and 12 deletions

View File

@@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
/** /**
* @author yjy * @author yjy
@@ -19,11 +20,14 @@ public class ReadHistoricalDruidData implements Callable<ArrayList<Map<String, O
private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class); private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class);
private String sql; private String sql;
private CountDownLatch countDownLatch;
public ReadHistoricalDruidData( public ReadHistoricalDruidData(
String sql String sql,
CountDownLatch countDownLatch
){ ){
this.sql = sql; this.sql = sql;
this.countDownLatch = countDownLatch;
} }
@Override @Override
@@ -34,12 +38,18 @@ public class ReadHistoricalDruidData implements Callable<ArrayList<Map<String, O
AvaticaConnection connection = DruidUtils.getConn(); AvaticaConnection connection = DruidUtils.getConn();
AvaticaStatement stat = connection.createStatement(); AvaticaStatement stat = connection.createStatement();
resultData.addAll(DruidData.readFromDruid(sql, stat)); resultData.addAll(DruidData.readFromDruid(sql, stat));
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start)); LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start));
connection.close(); connection.close();
stat.close(); stat.close();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally {
countDownLatch.countDown();
LOG.info("本线程读取完毕,剩余线程数量:" + countDownLatch.getCount());
} }
return resultData; return resultData;
} }

View File

@@ -85,15 +85,21 @@ public class BaselineGeneration {
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory,
new ThreadPoolExecutor.AbortPolicy()); new ThreadPoolExecutor.AbortPolicy());
long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR; long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR;
int threadPoolNum = (int) ((START_END_TIMES._1-START_END_TIMES._2)/timeGrad);
ArrayList<Future<ArrayList<Map<String, Object>>>> resultList = new ArrayList<>(); ArrayList<Future<ArrayList<Map<String, Object>>>> resultList = new ArrayList<>();
for (int i = 0; i < (START_END_TIMES._1-START_END_TIMES._2)/timeGrad; i++) { CountDownLatch loadDataCountDownLatch = new CountDownLatch(threadPoolNum);
for (int i = 0; i < threadNum; i++) {
String sql = DruidData.getDruidQuerySql(START_END_TIMES._1, i, timeGrad); String sql = DruidData.getDruidQuerySql(START_END_TIMES._1, i, timeGrad);
ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData( ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData(
sql sql,
loadDataCountDownLatch
); );
Future<ArrayList<Map<String, Object>>> future = loadDataExecutor.submit(readHistoricalDruidData); Future<ArrayList<Map<String, Object>>> future = loadDataExecutor.submit(readHistoricalDruidData);
resultList.add(future); resultList.add(future);
} }
loadDataExecutor.shutdown();
loadDataCountDownLatch.await();
for(Future<ArrayList<Map<String, Object>>> future: resultList){ for(Future<ArrayList<Map<String, Object>>> future: resultList){
try { try {
if(future.get()!=null){ if(future.get()!=null){
@@ -105,12 +111,8 @@ public class BaselineGeneration {
e.printStackTrace(); e.printStackTrace();
} }
} }
long last = System.currentTimeMillis(); long last = System.currentTimeMillis();
LOG.info("Druid 加载数据共耗时:"+(last-start)); LOG.info("Druid 加载数据共耗时:"+(last-start));
loadDataExecutor.shutdown();
loadDataExecutor.awaitTermination(10L, TimeUnit.HOURS);
// BaseLine生成 // BaseLine生成
// 获取IP列表 // 获取IP列表
@@ -127,7 +129,7 @@ public class BaselineGeneration {
// 分批进行IP baseline生成和处理 // 分批进行IP baseline生成和处理
List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
CountDownLatch generateCountDownLatch = new CountDownLatch(threadPoolNum);
for (List<String> batchIps: batchIpLists){ for (List<String> batchIps: batchIpLists){
if(batchIps.size()>0){ if(batchIps.size()>0){
BaselineSingleThread baselineSingleThread = new BaselineSingleThread( BaselineSingleThread baselineSingleThread = new BaselineSingleThread(
@@ -136,14 +138,14 @@ public class BaselineGeneration {
ATTACK_TYPE_LIST, ATTACK_TYPE_LIST,
BASELINE_POINT_NUM, BASELINE_POINT_NUM,
TIME_FILTER, TIME_FILTER,
allFromDruid allFromDruid,
generateCountDownLatch
); );
generationExecutor.execute(baselineSingleThread); generationExecutor.execute(baselineSingleThread);
} }
} }
generationExecutor.shutdown(); generationExecutor.shutdown();
generationExecutor.awaitTermination(10L, TimeUnit.HOURS); generateCountDownLatch.await();
} }
} }

View File

@@ -18,6 +18,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@@ -35,6 +36,7 @@ public class BaselineSingleThread extends Thread {
private String timeFilter; private String timeFilter;
private List<Map<String, Object>> batchDruidData; private List<Map<String, Object>> batchDruidData;
private List<Map<String, Object>> historicalData; private List<Map<String, Object>> historicalData;
private CountDownLatch countDownLatch;
public BaselineSingleThread( public BaselineSingleThread(
List<String> batchIpList, List<String> batchIpList,
@@ -42,7 +44,8 @@ public class BaselineSingleThread extends Thread {
List<String> attackTypeList, List<String> attackTypeList,
Integer BASELINE_POINT_NUM, Integer BASELINE_POINT_NUM,
String timeFilter, String timeFilter,
List<Map<String, Object>> historicalData List<Map<String, Object>> historicalData,
CountDownLatch countDownLatch
){ ){
this.ipList = batchIpList; this.ipList = batchIpList;
this.hbaseTable = hbaseTable; this.hbaseTable = hbaseTable;
@@ -50,6 +53,7 @@ public class BaselineSingleThread extends Thread {
this.BASELINE_POINT_NUM = BASELINE_POINT_NUM; this.BASELINE_POINT_NUM = BASELINE_POINT_NUM;
this.timeFilter = timeFilter; this.timeFilter = timeFilter;
this.historicalData = historicalData; this.historicalData = historicalData;
this.countDownLatch = countDownLatch;
} }
@Override @Override
@@ -70,6 +74,9 @@ public class BaselineSingleThread extends Thread {
LOG.info(" 成功写入Baseline条数共计 " + putList.size()); LOG.info(" 成功写入Baseline条数共计 " + putList.size());
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} finally {
countDownLatch.countDown();
LOG.info("本线程读取完毕,剩余线程数量:" + countDownLatch.getCount());
} }
} }