This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
wanglihui-ip-learning-graph/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java

78 lines
3.0 KiB
Java
Raw Normal View History

2020-08-06 16:11:16 +08:00
package cn.ac.iie.dao;
import cn.ac.iie.config.ApplicationConfig;
import cn.ac.iie.service.read.ReadHistoryArangoData;
import cn.ac.iie.utils.ArangoDBConnect;
import cn.ac.iie.utils.ExecutorThreadPool;
import com.arangodb.ArangoCursor;
import com.arangodb.entity.BaseDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
/**
* 获取arangoDB历史数据
*
* @author wlh
*/
public class BaseArangoData {
private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class);
private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance();
private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance();
2020-09-17 19:12:35 +08:00
public <T extends BaseDocument> ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> readHistoryData(String table, Class<T> type) {
ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyMap = new ConcurrentHashMap<>();
2020-08-06 16:11:16 +08:00
try {
LOG.warn("开始更新" + table);
2020-08-06 16:11:16 +08:00
long start = System.currentTimeMillis();
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER(); i++) {
2020-08-06 16:11:16 +08:00
historyMap.put(i, new ConcurrentHashMap<>());
}
CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER());
Long countTotal = getCountTotal(table);
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER(); i++) {
String sql = getQuerySql(countTotal, i, table);
2020-08-06 16:11:16 +08:00
ReadHistoryArangoData<T> readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch);
threadPool.executor(readHistoryArangoData);
}
countDownLatch.await();
long last = System.currentTimeMillis();
LOG.warn("读取" + table + " arangoDB 共耗时:" + (last - start));
2020-08-06 16:11:16 +08:00
} catch (Exception e) {
e.printStackTrace();
}
2020-09-17 19:12:35 +08:00
return historyMap;
2020-08-06 16:11:16 +08:00
}
private Long getCountTotal(String table){
long start = System.currentTimeMillis();
Long cnt = 0L;
String sql = "RETURN LENGTH("+table+")";
try {
ArangoCursor<Long> longs = arangoDBConnect.executorQuery(sql, Long.class);
while (longs.hasNext()){
cnt = longs.next();
}
}catch (Exception e){
LOG.error(sql +"执行异常");
}
long last = System.currentTimeMillis();
2020-09-17 19:12:35 +08:00
LOG.warn(sql+" 结果:"+cnt+" 执行时间:"+(last-start));
return cnt;
}
private String getQuerySql(Long cnt,int threadNumber, String table){
long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER() + 1;
long offsetNum = threadNumber * sepNum;
2020-09-17 19:12:35 +08:00
if (sepNum >= ApplicationConfig.ARANGODB_READ_LIMIT() * 10000){
sepNum = ApplicationConfig.ARANGODB_READ_LIMIT() * 10000;
2020-08-06 16:11:16 +08:00
}
2020-09-17 19:12:35 +08:00
return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc";
2020-08-06 16:11:16 +08:00
}
}