整合YSP项目
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
package cn.ac.iie.dao;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import cn.ac.iie.service.ingestion.ReadHistoryArangoData;
|
||||
import cn.ac.iie.service.read.ReadHistoryArangoData;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import cn.ac.iie.utils.ExecutorThreadPool;
|
||||
import com.arangodb.ArangoCursor;
|
||||
@@ -15,79 +15,89 @@ import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* 获取arangoDB历史数据
|
||||
*
|
||||
* @author wlh
|
||||
*/
|
||||
public class BaseArangoData {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class);
|
||||
|
||||
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseDocument>> historyVertexFqdnMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseDocument>> historyVertexIpMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseDocument>> historyVertexSubscriberMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexFqdnMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexIpMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexSubscriberMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>();
|
||||
static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>();
|
||||
|
||||
private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance();
|
||||
|
||||
private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance();
|
||||
|
||||
<T extends BaseDocument> void readHistoryData(String table,
|
||||
ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map,
|
||||
ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyMap,
|
||||
Class<T> type) {
|
||||
try {
|
||||
LOG.info("开始更新"+table);
|
||||
LOG.info("开始更新" + table);
|
||||
long start = System.currentTimeMillis();
|
||||
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){
|
||||
map.put(i,new ConcurrentHashMap<>());
|
||||
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
|
||||
historyMap.put(i, new ConcurrentHashMap<>());
|
||||
}
|
||||
CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER);
|
||||
long[] timeRange = getTimeRange(table);
|
||||
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
|
||||
String sql = getQuerySql(timeRange, i, table);
|
||||
ReadHistoryArangoData<T> readHistoryArangoData =
|
||||
new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch);
|
||||
ReadHistoryArangoData<T> readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch);
|
||||
threadPool.executor(readHistoryArangoData);
|
||||
}
|
||||
countDownLatch.await();
|
||||
long last = System.currentTimeMillis();
|
||||
LOG.info("读取"+table+" arangoDB 共耗时:"+(last-start));
|
||||
}catch (Exception e){
|
||||
LOG.info("读取" + table + " arangoDB 共耗时:" + (last - start));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private long[] getTimeRange(String table){
|
||||
private long[] getTimeRange(String table) {
|
||||
long minTime = 0L;
|
||||
long maxTime = 0L;
|
||||
long startTime = System.currentTimeMillis();
|
||||
String sql = "LET doc = (FOR doc IN "+table+" RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}";
|
||||
ArangoCursor<BaseDocument> timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class);
|
||||
try {
|
||||
if (timeDoc != null){
|
||||
while (timeDoc.hasNext()) {
|
||||
BaseDocument doc = timeDoc.next();
|
||||
maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER;
|
||||
minTime = Long.parseLong(doc.getAttribute("min_time").toString());
|
||||
String sql = "LET doc = (FOR doc IN " + table + " RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}";
|
||||
switch (ApplicationConfig.ARANGO_TIME_LIMIT_TYPE) {
|
||||
case 0:
|
||||
ArangoCursor<BaseDocument> timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class);
|
||||
try {
|
||||
if (timeDoc != null) {
|
||||
while (timeDoc.hasNext()) {
|
||||
BaseDocument doc = timeDoc.next();
|
||||
maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER;
|
||||
minTime = Long.parseLong(doc.getAttribute("min_time").toString());
|
||||
}
|
||||
} else {
|
||||
LOG.warn("获取ArangoDb时间范围为空");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
long lastTime = System.currentTimeMillis();
|
||||
LOG.info(sql+"\n查询最大最小时间用时:" + (lastTime - startTime));
|
||||
}else {
|
||||
LOG.warn("获取ArangoDb时间范围为空");
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
break;
|
||||
case 1:
|
||||
maxTime = ApplicationConfig.READ_ARANGO_MAX_TIME;
|
||||
minTime = ApplicationConfig.READ_ARANGO_MIN_TIME;
|
||||
break;
|
||||
default:
|
||||
}
|
||||
long lastTime = System.currentTimeMillis();
|
||||
LOG.info(sql + "\n查询最大最小时间用时:" + (lastTime - startTime));
|
||||
return new long[]{minTime, maxTime};
|
||||
|
||||
}
|
||||
|
||||
private String getQuerySql(long[] timeRange,int threadNumber,String table){
|
||||
private String getQuerySql(long[] timeRange, int threadNumber, String table) {
|
||||
long minTime = timeRange[0];
|
||||
long maxTime = timeRange[1];
|
||||
long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER;
|
||||
long maxThreadTime = minTime + (threadNumber + 1)* diffTime;
|
||||
long maxThreadTime = minTime + (threadNumber + 1) * diffTime;
|
||||
long minThreadTime = minTime + threadNumber * diffTime;
|
||||
return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" " + ApplicationConfig.ARANGODB_READ_LIMIT + " RETURN doc";
|
||||
return "FOR doc IN " + table + " filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " " + ApplicationConfig.ARANGODB_READ_LIMIT + " RETURN doc";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ import java.util.HashMap;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static cn.ac.iie.service.ingestion.ReadClickhouseData.*;
|
||||
import static cn.ac.iie.service.read.ReadClickhouseData.putMapByHashcode;
|
||||
|
||||
/**
|
||||
* 读取clickhouse数据,封装到map
|
||||
@@ -24,23 +24,25 @@ import static cn.ac.iie.service.ingestion.ReadClickhouseData.*;
|
||||
public class BaseClickhouseData {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BaseClickhouseData.class);
|
||||
|
||||
private static ClickhouseConnect manger = ClickhouseConnect.getInstance();
|
||||
static HashMap<Integer, HashMap<String, ArrayList<BaseDocument>>> newVertexFqdnMap = new HashMap<>();
|
||||
static HashMap<Integer, HashMap<String, ArrayList<BaseDocument>>> newVertexIpMap = new HashMap<>();
|
||||
static HashMap<Integer, HashMap<String,ArrayList<BaseDocument>>> newVertexSubscriberMap = new HashMap<>();
|
||||
static HashMap<Integer, HashMap<String, ArrayList<BaseEdgeDocument>>> newRelationFqdnAddressIpMap = new HashMap<>();
|
||||
static HashMap<Integer, HashMap<String, ArrayList<BaseEdgeDocument>>> newRelationIpVisitFqdnMap = new HashMap<>();
|
||||
static HashMap<Integer, HashMap<String, ArrayList<BaseEdgeDocument>>> newRelationSubsciberLocateIpMap = new HashMap<>();
|
||||
static HashMap<Integer, HashMap<String, ArrayList<BaseEdgeDocument>>> newRelationFqdnSameFqdnMap = new HashMap<>();
|
||||
|
||||
private static ClickhouseConnect manger = ClickhouseConnect.getInstance();
|
||||
private DruidPooledConnection connection;
|
||||
private Statement statement;
|
||||
|
||||
<T extends BaseDocument> void baseDocumentFromClickhouse(HashMap<Integer, HashMap<String, ArrayList<T>>> newMap,
|
||||
Supplier<String> getSqlSupplier,
|
||||
Function<ResultSet,T> formatResultFunc) {
|
||||
Function<ResultSet,T> formatResultFunc){
|
||||
long start = System.currentTimeMillis();
|
||||
initializeMap(newMap);
|
||||
String sql = getSqlSupplier.get();
|
||||
LOG.info(sql);
|
||||
try {
|
||||
connection = manger.getConnection();
|
||||
statement = connection.createStatement();
|
||||
@@ -54,7 +56,7 @@ public class BaseClickhouseData {
|
||||
}
|
||||
}
|
||||
long last = System.currentTimeMillis();
|
||||
LOG.info(sql + "\n读取"+i+"条数据,运行时间:" + (last - start));
|
||||
LOG.info("读取"+i+"条数据,运行时间:" + (last - start));
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}finally {
|
||||
@@ -65,7 +67,7 @@ public class BaseClickhouseData {
|
||||
private <T extends BaseDocument> void initializeMap(HashMap<Integer, HashMap<String,ArrayList<T>>> map){
|
||||
try {
|
||||
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
|
||||
map.put(i, new HashMap<>());
|
||||
map.put(i, new HashMap<>(16));
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
package cn.ac.iie.dao;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import cn.ac.iie.service.ingestion.ReadClickhouseData;
|
||||
import cn.ac.iie.service.read.ReadClickhouseData;
|
||||
import cn.ac.iie.service.update.Document;
|
||||
import cn.ac.iie.service.update.relationship.LocateFqdn2Ip;
|
||||
import cn.ac.iie.service.update.relationship.LocateSubscriber2Ip;
|
||||
import cn.ac.iie.service.update.relationship.SameFqdn2Fqdn;
|
||||
import cn.ac.iie.service.update.relationship.VisitIp2Fqdn;
|
||||
import cn.ac.iie.service.update.vertex.Fqdn;
|
||||
import cn.ac.iie.service.update.vertex.Ip;
|
||||
import cn.ac.iie.service.update.vertex.Subscriber;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import cn.ac.iie.utils.ExecutorThreadPool;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
@@ -36,41 +35,36 @@ public class UpdateGraphData {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(UpdateGraphData.class);
|
||||
private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance();
|
||||
private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
|
||||
|
||||
private static BaseArangoData baseArangoData = new BaseArangoData();
|
||||
private static BaseClickhouseData baseClickhouseData = new BaseClickhouseData();
|
||||
|
||||
|
||||
public void updateArango(){
|
||||
long start = System.currentTimeMillis();
|
||||
try {
|
||||
|
||||
updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN",
|
||||
Fqdn.class,BaseDocument.class,
|
||||
ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
|
||||
ReadClickhouseData::getVertexFqdnSql, ReadClickhouseData::getVertexFqdnDocument);
|
||||
|
||||
updateDocument(newVertexIpMap,historyVertexIpMap,"IP",
|
||||
Ip.class,BaseDocument.class,
|
||||
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
|
||||
|
||||
// updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER",
|
||||
// Subscriber.class,BaseDocument.class,
|
||||
// ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument);
|
||||
ReadClickhouseData::getVertexIpSql, ReadClickhouseData::getVertexIpDocument);
|
||||
|
||||
updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP",
|
||||
LocateFqdn2Ip.class,BaseEdgeDocument.class,
|
||||
ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument);
|
||||
ReadClickhouseData::getRelationshipFqdnAddressIpSql, ReadClickhouseData::getRelationFqdnAddressIpDocument);
|
||||
|
||||
// updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN",
|
||||
// VisitIp2Fqdn.class,BaseEdgeDocument.class,
|
||||
// ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument);
|
||||
updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN",
|
||||
VisitIp2Fqdn.class,BaseEdgeDocument.class,
|
||||
ReadClickhouseData::getRelationshipIpVisitFqdnSql, ReadClickhouseData::getRelationIpVisitFqdnDocument);
|
||||
|
||||
// updateDocument(newRelationSubsciberLocateIpMap,historyRelationSubsciberLocateIpMap,"R_LOCATE_SUBSCRIBER2IP",
|
||||
// LocateSubscriber2Ip.class,BaseEdgeDocument.class,
|
||||
// ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument);
|
||||
updateDocument(newRelationFqdnSameFqdnMap,historyRelationFqdnSameFqdnMap,"R_SAME_ORIGIN_FQDN2FQDN",
|
||||
SameFqdn2Fqdn.class,BaseEdgeDocument.class,
|
||||
ReadClickhouseData::getRelationshipFqdnSameFqdnSql, ReadClickhouseData::getRelationshipFqdnSameFqdnDocument);
|
||||
|
||||
|
||||
long last = System.currentTimeMillis();
|
||||
LOG.info("iplearning application运行完毕,用时:"+(last - start));
|
||||
LOG.info("更新图数据库时间共计:"+(last - start));
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}finally {
|
||||
@@ -79,15 +73,13 @@ public class UpdateGraphData {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private <T extends BaseDocument> void updateDocument(HashMap<Integer, HashMap<String, ArrayList<T>>> newMap,
|
||||
ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyMap,
|
||||
String collection,
|
||||
Class<? extends Document<T>> taskType,
|
||||
Class<T> docmentType,
|
||||
Supplier<String> getSqlSupplier,
|
||||
Function<ResultSet,T> formatResultFunc
|
||||
) {
|
||||
Function<ResultSet,T> formatResultFunc) {
|
||||
try {
|
||||
|
||||
baseArangoData.readHistoryData(collection,historyMap,docmentType);
|
||||
|
||||
Reference in New Issue
Block a user