抽取公共方法,重构代码逻辑

This commit is contained in:
wanglihui
2020-07-08 19:44:46 +08:00
parent 78664828e1
commit 0e926aa7d0
25 changed files with 911 additions and 1052 deletions

View File

@@ -1,10 +1,7 @@
package cn.ac.iie.dao;
import cn.ac.iie.config.ApplicationConfig;
import cn.ac.iie.etl.fqdn2ip.ArangoEFqdnAddressIpToMap;
import cn.ac.iie.etl.ip2fqdn.ArangoEIpVisitFqdnToMap;
import cn.ac.iie.etl.fqdn.ArangoVFqdnToMap;
import cn.ac.iie.etl.ip.ArangoVIpToMap;
import cn.ac.iie.etl.read.ReadHistoryArangoData;
import cn.ac.iie.utils.ArangoDBConnect;
import cn.ac.iie.utils.ExecutorThreadPool;
import com.arangodb.ArangoCursor;
@@ -18,8 +15,8 @@ import java.util.concurrent.ConcurrentHashMap;
public class BaseArangoData {
private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class);
public static ConcurrentHashMap<String, BaseDocument> v_Fqdn_Map = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseDocument> v_Ip_Map = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseEdgeDocument> v_Fqdn_Map = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseEdgeDocument> v_Ip_Map = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseEdgeDocument> e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseEdgeDocument> e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>();
@@ -27,48 +24,40 @@ public class BaseArangoData {
private static final ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance();
public static void BaseVFqdnDataMap() {
String sql = "LET FQDN = (FOR doc IN FQDN RETURN doc) return {max_time:MAX(FQDN[*].FIRST_FOUND_TIME),min_time:MIN(FQDN[*].FIRST_FOUND_TIME)}";
long[] timeLimit = getTimeLimit(sql);
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
ArangoVFqdnToMap ArangoVFqdnToMap = new ArangoVFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i);
threadPool.executor(ArangoVFqdnToMap);
public void baseDocumentDataMap(){
long startA = System.currentTimeMillis();
readHistoryData("FQDN", v_Fqdn_Map);
readHistoryData("IP", v_Ip_Map);
readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map);
readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map);
threadPool.shutdown();
threadPool.awaitThreadTask();
LOG.info("v_Fqdn_Map大小"+BaseArangoData.v_Fqdn_Map.size());
LOG.info("v_Ip_Map大小"+BaseArangoData.v_Ip_Map.size());
LOG.info("e_Fqdn_Address_Ip_Map大小"+BaseArangoData.e_Fqdn_Address_Ip_Map.size());
LOG.info("e_Ip_Visit_Fqdn_Map大小"+BaseArangoData.e_Ip_Visit_Fqdn_Map.size());
long lastA = System.currentTimeMillis();
LOG.info("读取ArangoDb时间"+(lastA - startA));
}
private void readHistoryData(String table, ConcurrentHashMap<String, BaseEdgeDocument> map){
try {
long[] timeRange = getTimeRange(table);
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
String sql = getQuerySql(timeRange, i, table);
ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData(arangoDBConnect, sql, map);
threadPool.executor(readHistoryArangoData);
}
}catch (Exception e){
e.printStackTrace();
}
}
public static void BaseVIpDataMap() {
String sql = "LET IP = (FOR doc IN IP RETURN doc) return {max_time:MAX(IP[*].FIRST_FOUND_TIME),min_time:MIN(IP[*].FIRST_FOUND_TIME)}";
long[] timeLimit = getTimeLimit(sql);
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
ArangoVIpToMap arangoVIpToMap = new ArangoVIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i);
threadPool.executor(arangoVIpToMap);
}
}
public static void BaseEFqdnAddressIpDataMap(){
String sql = "LET e = (FOR doc IN R_LOCATE_FQDN2IP RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}";
long[] timeLimit = getTimeLimit(sql);
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){
ArangoEFqdnAddressIpToMap arangoEFqdnAddressIpToMap = new ArangoEFqdnAddressIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i);
threadPool.executor(arangoEFqdnAddressIpToMap);
}
}
public static void BaseEIpVisitFqdnDataMap(){
String sql = "LET e = (FOR doc IN R_VISIT_IP2FQDN RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}";
long[] timeLimit = getTimeLimit(sql);
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){
ArangoEIpVisitFqdnToMap arangoEIpVisitFqdnToMap = new ArangoEIpVisitFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i);
threadPool.executor(arangoEIpVisitFqdnToMap);
}
}
private static long[] getTimeLimit(String sql) {
private long[] getTimeRange(String table){
long minTime = 0L;
long maxTime = 0L;
long diffTime = 0L;
long startTime = System.currentTimeMillis();
// LOG.info(sql);
String sql = "LET doc = (FOR doc IN "+table+" RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}";
ArangoCursor<BaseDocument> timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class);
try {
if (timeDoc != null){
@@ -79,14 +68,23 @@ public class BaseArangoData {
}
long lastTime = System.currentTimeMillis();
LOG.info(sql+"\n查询最大最小时间用时" + (lastTime - startTime));
diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER;
}else {
LOG.warn("获取ArangoDb时间范围为空");
}
}catch (Exception e){
LOG.error(e.toString());
e.printStackTrace();
}
return new long[]{minTime, maxTime, diffTime};
return new long[]{minTime, maxTime};
}
private String getQuerySql(long[] timeRange,int threadNumber,String table){
long minTime = timeRange[0];
long maxTime = timeRange[1];
long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER;
long maxThreadTime = minTime + (threadNumber + 1)* diffTime;
long minThreadTime = minTime + threadNumber * diffTime;
return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc";
}
}