diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java index a141219..2bb9879 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -43,9 +43,11 @@ public class BaseArangoData { historyMap.put(i, new ConcurrentHashMap<>()); } CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER()); - long[] timeRange = getTimeRange(table); +// long[] timeRange = getTimeRange(table); + Long total = getCountTotal(table); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER(); i++) { - String sql = getQuerySql(timeRange, i, table); +// String sql = getQuerySql(timeRange, i, table); + String sql = getQuerySql(total, i, table); ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch); threadPool.executor(readHistoryArangoData); } @@ -57,6 +59,29 @@ public class BaseArangoData { } } + private Long getCountTotal(String table){ + long start = System.currentTimeMillis(); + Long cnt = 0L; + String sql = "RETURN LENGTH("+table+")"; + try { + ArangoCursor longs = arangoDBConnect.executorQuery(sql, Long.class); + while (longs.hasNext()){ + cnt = longs.next(); + } + }catch (Exception e){ + LOG.error(sql +"执行异常"); + } + long last = System.currentTimeMillis(); + LOG.info(sql+" 结果:"+cnt+" 执行时间:"+(last-start)); + return cnt; + } + + private String getQuerySql(Long cnt,int threadNumber, String table){ + long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER() + 1; + long offsetNum = threadNumber * sepNum; + return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc"; + } + private long[] getTimeRange(String table) { long minTime = 0L; long maxTime = 0L;