diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index af47dcf..3a29959 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -10,6 +10,7 @@ import com.arangodb.entity.BaseEdgeDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -43,9 +44,11 @@ public class BaseArangoData { historyMap.put(i, new ConcurrentHashMap<>()); } CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - long[] timeRange = getTimeRange(table); +// long[] timeRange = getTimeRange(table); + Long countTotal = getCountTotal(table); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - String sql = getQuerySql(timeRange, i, table); +// String sql = getQuerySql(timeRange, i, table); + String sql = getQuerySql(countTotal, i, table); ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch); threadPool.executor(readHistoryArangoData); } @@ -91,6 +94,23 @@ public class BaseArangoData { } + private Long getCountTotal(String table){ + long start = System.currentTimeMillis(); + Long cnt = 0L; + String sql = "RETURN LENGTH("+table+")"; + try { + ArangoCursor longs = arangoDBConnect.executorQuery(sql, Long.class); + while (longs.hasNext()){ + cnt = longs.next(); + } + }catch (Exception e){ + LOG.error(sql +"执行异常"); + } + long last = System.currentTimeMillis(); + LOG.info(sql+" 结果:"+cnt+" 执行时间:"+(last-start)); + return cnt; + } + private String getQuerySql(long[] timeRange, int threadNumber, String table) { long minTime = timeRange[0]; long maxTime = timeRange[1]; @@ -100,4 +120,10 @@ public class BaseArangoData { return "FOR doc IN " + table + " filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " " + ApplicationConfig.ARANGODB_READ_LIMIT + " RETURN doc"; } + private String getQuerySql(Long cnt,int threadNumber, String table){ + long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER + 1; + long offsetNum = threadNumber * sepNum; + return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc"; + } + } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index b0e6e7a..56d65d1 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -54,9 +54,9 @@ public class UpdateGraphData { LocateFqdn2Ip.class,BaseEdgeDocument.class, ReadClickhouseData::getRelationshipFqdnAddressIpSql, ReadClickhouseData::getRelationFqdnAddressIpDocument); -// updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN", -// VisitIp2Fqdn.class,BaseEdgeDocument.class, -// ReadClickhouseData::getRelationshipIpVisitFqdnSql, ReadClickhouseData::getRelationIpVisitFqdnDocument); + updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN", + VisitIp2Fqdn.class,BaseEdgeDocument.class, + ReadClickhouseData::getRelationshipIpVisitFqdnSql, ReadClickhouseData::getRelationIpVisitFqdnDocument); updateDocument(newRelationFqdnSameFqdnMap,historyRelationFqdnSameFqdnMap,"R_SAME_ORIGIN_FQDN2FQDN", SameFqdn2Fqdn.class,BaseEdgeDocument.class, diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index 0265978..0d1ea6d 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -3,8 +3,7 @@ arangoDB.host=192.168.40.182 arangoDB.port=8529 arangoDB.user=upsert arangoDB.password=ceiec2018 -arangoDB.DB.name=ip-learning-test-0 -#arangoDB.DB.name=insert_iplearn_index +arangoDB.DB.name=ip-learning-test arangoDB.batch=100000 arangoDB.ttl=3600 @@ -16,12 +15,12 @@ thread.await.termination.time=10 #读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围 -clickhouse.time.limit.type=1 +clickhouse.time.limit.type=0 read.clickhouse.max.time=1571245230 read.clickhouse.min.time=1571245220 #读取arangoDB时间范围方式,0:正常读,1:指定时间范围 -arango.time.limit.type=1 +arango.time.limit.type=0 read.arango.max.time=1571245220 read.arango.min.time=1571245210 diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java b/IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java index 56f9b50..4704185 100644 --- a/IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java +++ b/IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java @@ -1,10 +1,27 @@ package cn.ac.iie; import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; import com.arangodb.entity.BaseEdgeDocument; public class readHistoryDataTest { public static void main(String[] args) { - BaseArangoData baseArangoData = new BaseArangoData(); + ArangoDBConnect instance = ArangoDBConnect.getInstance(); +// ArangoCursor baseDocuments = instance.executorQuery("RETURN LENGTH(R_LOCATE_FQDN2IP)", Long.class); +// while (baseDocuments.hasNext()){ +// Long next = baseDocuments.next(); +// System.out.println(next.toString()); +// } +// instance.clean(); + + String sql = "FOR doc IN FQDN filter doc.FIRST_FOUND_TIME >= 1595423493 and doc.FIRST_FOUND_TIME <= 1595809766 limit 763,10 RETURN doc"; + ArangoCursor baseDocuments = instance.executorQuery(sql, BaseDocument.class); + while (baseDocuments.hasNext()){ + BaseDocument next = baseDocuments.next(); + System.out.println(next.toString()); + } + instance.clean(); } }