修改读取arangoDb方式为分页读取。
This commit is contained in:
@@ -10,6 +10,7 @@ import com.arangodb.entity.BaseEdgeDocument;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
@@ -43,9 +44,11 @@ public class BaseArangoData {
|
|||||||
historyMap.put(i, new ConcurrentHashMap<>());
|
historyMap.put(i, new ConcurrentHashMap<>());
|
||||||
}
|
}
|
||||||
CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER);
|
CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER);
|
||||||
long[] timeRange = getTimeRange(table);
|
// long[] timeRange = getTimeRange(table);
|
||||||
|
Long countTotal = getCountTotal(table);
|
||||||
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
|
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
|
||||||
String sql = getQuerySql(timeRange, i, table);
|
// String sql = getQuerySql(timeRange, i, table);
|
||||||
|
String sql = getQuerySql(countTotal, i, table);
|
||||||
ReadHistoryArangoData<T> readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch);
|
ReadHistoryArangoData<T> readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch);
|
||||||
threadPool.executor(readHistoryArangoData);
|
threadPool.executor(readHistoryArangoData);
|
||||||
}
|
}
|
||||||
@@ -91,6 +94,23 @@ public class BaseArangoData {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Long getCountTotal(String table){
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
Long cnt = 0L;
|
||||||
|
String sql = "RETURN LENGTH("+table+")";
|
||||||
|
try {
|
||||||
|
ArangoCursor<Long> longs = arangoDBConnect.executorQuery(sql, Long.class);
|
||||||
|
while (longs.hasNext()){
|
||||||
|
cnt = longs.next();
|
||||||
|
}
|
||||||
|
}catch (Exception e){
|
||||||
|
LOG.error(sql +"执行异常");
|
||||||
|
}
|
||||||
|
long last = System.currentTimeMillis();
|
||||||
|
LOG.info(sql+" 结果:"+cnt+" 执行时间:"+(last-start));
|
||||||
|
return cnt;
|
||||||
|
}
|
||||||
|
|
||||||
private String getQuerySql(long[] timeRange, int threadNumber, String table) {
|
private String getQuerySql(long[] timeRange, int threadNumber, String table) {
|
||||||
long minTime = timeRange[0];
|
long minTime = timeRange[0];
|
||||||
long maxTime = timeRange[1];
|
long maxTime = timeRange[1];
|
||||||
@@ -100,4 +120,10 @@ public class BaseArangoData {
|
|||||||
return "FOR doc IN " + table + " filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " " + ApplicationConfig.ARANGODB_READ_LIMIT + " RETURN doc";
|
return "FOR doc IN " + table + " filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " " + ApplicationConfig.ARANGODB_READ_LIMIT + " RETURN doc";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getQuerySql(Long cnt,int threadNumber, String table){
|
||||||
|
long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER + 1;
|
||||||
|
long offsetNum = threadNumber * sepNum;
|
||||||
|
return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc";
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -54,9 +54,9 @@ public class UpdateGraphData {
|
|||||||
LocateFqdn2Ip.class,BaseEdgeDocument.class,
|
LocateFqdn2Ip.class,BaseEdgeDocument.class,
|
||||||
ReadClickhouseData::getRelationshipFqdnAddressIpSql, ReadClickhouseData::getRelationFqdnAddressIpDocument);
|
ReadClickhouseData::getRelationshipFqdnAddressIpSql, ReadClickhouseData::getRelationFqdnAddressIpDocument);
|
||||||
|
|
||||||
// updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN",
|
updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN",
|
||||||
// VisitIp2Fqdn.class,BaseEdgeDocument.class,
|
VisitIp2Fqdn.class,BaseEdgeDocument.class,
|
||||||
// ReadClickhouseData::getRelationshipIpVisitFqdnSql, ReadClickhouseData::getRelationIpVisitFqdnDocument);
|
ReadClickhouseData::getRelationshipIpVisitFqdnSql, ReadClickhouseData::getRelationIpVisitFqdnDocument);
|
||||||
|
|
||||||
updateDocument(newRelationFqdnSameFqdnMap,historyRelationFqdnSameFqdnMap,"R_SAME_ORIGIN_FQDN2FQDN",
|
updateDocument(newRelationFqdnSameFqdnMap,historyRelationFqdnSameFqdnMap,"R_SAME_ORIGIN_FQDN2FQDN",
|
||||||
SameFqdn2Fqdn.class,BaseEdgeDocument.class,
|
SameFqdn2Fqdn.class,BaseEdgeDocument.class,
|
||||||
|
|||||||
@@ -3,8 +3,7 @@ arangoDB.host=192.168.40.182
|
|||||||
arangoDB.port=8529
|
arangoDB.port=8529
|
||||||
arangoDB.user=upsert
|
arangoDB.user=upsert
|
||||||
arangoDB.password=ceiec2018
|
arangoDB.password=ceiec2018
|
||||||
arangoDB.DB.name=ip-learning-test-0
|
arangoDB.DB.name=ip-learning-test
|
||||||
#arangoDB.DB.name=insert_iplearn_index
|
|
||||||
arangoDB.batch=100000
|
arangoDB.batch=100000
|
||||||
arangoDB.ttl=3600
|
arangoDB.ttl=3600
|
||||||
|
|
||||||
@@ -16,12 +15,12 @@ thread.await.termination.time=10
|
|||||||
|
|
||||||
|
|
||||||
#读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围
|
#读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围
|
||||||
clickhouse.time.limit.type=1
|
clickhouse.time.limit.type=0
|
||||||
read.clickhouse.max.time=1571245230
|
read.clickhouse.max.time=1571245230
|
||||||
read.clickhouse.min.time=1571245220
|
read.clickhouse.min.time=1571245220
|
||||||
|
|
||||||
#读取arangoDB时间范围方式,0:正常读,1:指定时间范围
|
#读取arangoDB时间范围方式,0:正常读,1:指定时间范围
|
||||||
arango.time.limit.type=1
|
arango.time.limit.type=0
|
||||||
read.arango.max.time=1571245220
|
read.arango.max.time=1571245220
|
||||||
read.arango.min.time=1571245210
|
read.arango.min.time=1571245210
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,27 @@
|
|||||||
package cn.ac.iie;
|
package cn.ac.iie;
|
||||||
|
|
||||||
import cn.ac.iie.dao.BaseArangoData;
|
import cn.ac.iie.dao.BaseArangoData;
|
||||||
|
import cn.ac.iie.utils.ArangoDBConnect;
|
||||||
|
import com.arangodb.ArangoCursor;
|
||||||
|
import com.arangodb.entity.BaseDocument;
|
||||||
import com.arangodb.entity.BaseEdgeDocument;
|
import com.arangodb.entity.BaseEdgeDocument;
|
||||||
|
|
||||||
public class readHistoryDataTest {
|
public class readHistoryDataTest {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
BaseArangoData baseArangoData = new BaseArangoData();
|
ArangoDBConnect instance = ArangoDBConnect.getInstance();
|
||||||
|
// ArangoCursor<Long> baseDocuments = instance.executorQuery("RETURN LENGTH(R_LOCATE_FQDN2IP)", Long.class);
|
||||||
|
// while (baseDocuments.hasNext()){
|
||||||
|
// Long next = baseDocuments.next();
|
||||||
|
// System.out.println(next.toString());
|
||||||
|
// }
|
||||||
|
// instance.clean();
|
||||||
|
|
||||||
|
String sql = "FOR doc IN FQDN filter doc.FIRST_FOUND_TIME >= 1595423493 and doc.FIRST_FOUND_TIME <= 1595809766 limit 763,10 RETURN doc";
|
||||||
|
ArangoCursor<BaseDocument> baseDocuments = instance.executorQuery(sql, BaseDocument.class);
|
||||||
|
while (baseDocuments.hasNext()){
|
||||||
|
BaseDocument next = baseDocuments.next();
|
||||||
|
System.out.println(next.toString());
|
||||||
|
}
|
||||||
|
instance.clean();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user