修改distinct client IP存储时间为24小时
This commit is contained in:
@@ -31,7 +31,9 @@ public class BaseArangoData {
|
||||
|
||||
private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance();
|
||||
|
||||
<T extends BaseDocument> void readHistoryData(String table,ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map,Class<T> type){
|
||||
<T extends BaseDocument> void readHistoryData(String table,
|
||||
ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map,
|
||||
Class<T> type) {
|
||||
try {
|
||||
LOG.info("开始更新"+table);
|
||||
long start = System.currentTimeMillis();
|
||||
@@ -42,7 +44,8 @@ public class BaseArangoData {
|
||||
long[] timeRange = getTimeRange(table);
|
||||
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
|
||||
String sql = getQuerySql(timeRange, i, table);
|
||||
ReadHistoryArangoData<T> readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch);
|
||||
ReadHistoryArangoData<T> readHistoryArangoData =
|
||||
new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch);
|
||||
threadPool.executor(readHistoryArangoData);
|
||||
}
|
||||
countDownLatch.await();
|
||||
|
||||
@@ -35,7 +35,9 @@ public class BaseClickhouseData {
|
||||
private DruidPooledConnection connection;
|
||||
private Statement statement;
|
||||
|
||||
<T extends BaseDocument> void baseDocumentFromClickhouse(HashMap<Integer, HashMap<String, ArrayList<T>>> newMap, Supplier<String> getSqlSupplier, Function<ResultSet,T> formatResultFunc){
|
||||
<T extends BaseDocument> void baseDocumentFromClickhouse(HashMap<Integer, HashMap<String, ArrayList<T>>> newMap,
|
||||
Supplier<String> getSqlSupplier,
|
||||
Function<ResultSet,T> formatResultFunc) {
|
||||
long start = System.currentTimeMillis();
|
||||
initializeMap(newMap);
|
||||
String sql = getSqlSupplier.get();
|
||||
|
||||
@@ -60,9 +60,9 @@ public class UpdateGraphData {
|
||||
LocateFqdn2Ip.class,BaseEdgeDocument.class,
|
||||
ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument);
|
||||
|
||||
updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN",
|
||||
VisitIp2Fqdn.class,BaseEdgeDocument.class,
|
||||
ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument);
|
||||
// updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN",
|
||||
// VisitIp2Fqdn.class,BaseEdgeDocument.class,
|
||||
// ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument);
|
||||
|
||||
updateDocument(newRelationSubsciberLocateIpMap,historyRelationSubsciberLocateIpMap,"R_LOCATE_SUBSCRIBER2IP",
|
||||
LocateSubscriber2Ip.class,BaseEdgeDocument.class,
|
||||
@@ -106,7 +106,7 @@ public class UpdateGraphData {
|
||||
String.class,
|
||||
ConcurrentHashMap.class,
|
||||
CountDownLatch.class);
|
||||
Document<T> docTask = (Document<T>)constructor.newInstance(tmpNewMap, arangoManger, collection, tmpHisMap, countDownLatch);
|
||||
Document docTask = (Document)constructor.newInstance(tmpNewMap, arangoManger, collection, tmpHisMap, countDownLatch);
|
||||
pool.executor(docTask);
|
||||
}
|
||||
countDownLatch.await();
|
||||
|
||||
Reference in New Issue
Block a user