diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java index d711439..c431d9b 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; /** * 获取arangoDB历史数据 @@ -18,45 +19,31 @@ import java.util.concurrent.ConcurrentHashMap; public class BaseArangoData { private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); - public static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); - public static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); - public static ConcurrentHashMap v_Subscriber_Map = new ConcurrentHashMap<>(); - public static ConcurrentHashMap e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>(); - public static ConcurrentHashMap e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>(); - public static ConcurrentHashMap e_Subsciber_Locate_Ip_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyVertexFqdnMap = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyVertexIpMap = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyVertexSubscriberMap = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>(); private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); - private static ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); + private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); - public void baseDocumentDataMap(){ - long startA = System.currentTimeMillis(); - readHistoryData("FQDN", v_Fqdn_Map,BaseDocument.class); - readHistoryData("IP", v_Ip_Map,BaseDocument.class); - readHistoryData("SUBSCRIBER",v_Subscriber_Map,BaseDocument.class); - readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map,BaseEdgeDocument.class); - readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map,BaseEdgeDocument.class); - readHistoryData("R_LOCATE_SUBSCRIBER2IP",e_Subsciber_Locate_Ip_Map,BaseEdgeDocument.class); - threadPool.shutdown(); - threadPool.awaitThreadTask(); - LOG.info("v_Fqdn_Map大小:"+v_Fqdn_Map.size()); - LOG.info("v_Ip_Map大小:"+v_Ip_Map.size()); - LOG.info("v_Subscriber_Map大小:"+v_Subscriber_Map.size()); - LOG.info("e_Fqdn_Address_Ip_Map大小:"+e_Fqdn_Address_Ip_Map.size()); - LOG.info("e_Ip_Visit_Fqdn_Map大小:"+e_Ip_Visit_Fqdn_Map.size()); - LOG.info("e_Subsciber_Locate_Ip_Map大小:"+e_Subsciber_Locate_Ip_Map.size()); - long lastA = System.currentTimeMillis(); - LOG.info("读取ArangoDb时间:"+(lastA - startA)); - } - - private void readHistoryData(String table, ConcurrentHashMap map, Class type){ + void readHistoryData(String table, ConcurrentHashMap map, Class type){ try { + long start = System.currentTimeMillis(); + CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); long[] timeRange = getTimeRange(table); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { String sql = getQuerySql(timeRange, i, table); - ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table); + ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch); threadPool.executor(readHistoryArangoData); } + countDownLatch.await(); + long last = System.currentTimeMillis(); + LOG.info("读取"+table+" arangoDB 共耗时:"+(last-start)); + LOG.info(table+" history Map大小为:"+map.size()); }catch (Exception e){ e.printStackTrace(); } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index b33b73c..2fdc967 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -23,18 +23,18 @@ public class BaseClickhouseData { private static final Logger LOG = LoggerFactory.getLogger(BaseClickhouseData.class); private static ClickhouseConnect manger = ClickhouseConnect.getInstance(); - static HashMap>> vFqdnMap = new HashMap<>(); - static HashMap>> vIpMap = new HashMap<>(); - static HashMap>> vSubscriberMap = new HashMap<>(); - static HashMap>> eFqdnAddressIpMap = new HashMap<>(); - static HashMap>> eIpVisitFqdnMap = new HashMap<>(); - static HashMap>> eSubsciberLocateIpMap = new HashMap<>(); + static HashMap>> newVertexFqdnMap = new HashMap<>(); + static HashMap>> newVertexIpMap = new HashMap<>(); + static HashMap>> newVertexSubscriberMap = new HashMap<>(); + static HashMap>> newRelationFqdnAddressIpMap = new HashMap<>(); + static HashMap>> newRelationIpVisitFqdnMap = new HashMap<>(); + static HashMap>> newRelationSubsciberLocateIpMap = new HashMap<>(); private DruidPooledConnection connection; private Statement statement; void baseVertexFqdn() { - initializeMap(vFqdnMap); + initializeMap(newVertexFqdnMap); LOG.info("FQDN resultMap初始化完成"); String sql = getVertexFqdnSql(); long start = System.currentTimeMillis(); @@ -45,7 +45,7 @@ public class BaseClickhouseData { while (resultSet.next()) { BaseDocument newDoc = getVertexFqdnDocument(resultSet); if (newDoc != null) { - putMapByHashcode(newDoc,vFqdnMap); + putMapByHashcode(newDoc, newVertexFqdnMap); } } long last = System.currentTimeMillis(); @@ -58,7 +58,7 @@ public class BaseClickhouseData { } void baseVertexIp() { - initializeMap(vIpMap); + initializeMap(newVertexIpMap); LOG.info("IP resultMap初始化完成"); String sql = getVertexIpSql(); long start = System.currentTimeMillis(); @@ -68,7 +68,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { BaseDocument newDoc = getVertexIpDocument(resultSet); - putMapByHashcode(newDoc,vIpMap); + putMapByHashcode(newDoc, newVertexIpMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse v_IP时间:" + (last - start)); @@ -80,7 +80,7 @@ public class BaseClickhouseData { } void baseVertexSubscriber(){ - initializeMap(vSubscriberMap); + initializeMap(newVertexSubscriberMap); LOG.info("SUBSCRIBER resultMap初始化完成"); String sql = getVertexSubscriberSql(); long start = System.currentTimeMillis(); @@ -90,7 +90,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()){ BaseDocument newDoc = getVertexSubscriberDocument(resultSet); - putMapByHashcode(newDoc,vSubscriberMap); + putMapByHashcode(newDoc, newVertexSubscriberMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse v_SUBSCRIBER时间:" + (last - start)); @@ -103,7 +103,7 @@ public class BaseClickhouseData { } void baseRelationshipSubscriberLocateIp(){ - initializeMap(eSubsciberLocateIpMap); + initializeMap(newRelationSubsciberLocateIpMap); LOG.info("R_LOCATE_SUBSCRIBER2IP"); String sql = getRelationshipSubsciberLocateIpSql(); long start = System.currentTimeMillis(); @@ -113,7 +113,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()){ BaseEdgeDocument newDoc = getRelationshipSubsciberLocateIpDocument(resultSet); - putMapByHashcode(newDoc,eSubsciberLocateIpMap); + putMapByHashcode(newDoc, newRelationSubsciberLocateIpMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse ESubsciberLocateIp时间:" + (last - start)); @@ -126,7 +126,7 @@ public class BaseClickhouseData { } void baseRelationshipFqdnAddressIp() { - initializeMap(eFqdnAddressIpMap); + initializeMap(newRelationFqdnAddressIpMap); LOG.info("R_LOCATE_FQDN2IP resultMap初始化完成"); String sql = getRelationshipFqdnAddressIpSql(); long start = System.currentTimeMillis(); @@ -137,7 +137,7 @@ public class BaseClickhouseData { while (resultSet.next()) { BaseEdgeDocument newDoc = getRelationFqdnAddressIpDocument(resultSet); - putMapByHashcode(newDoc,eFqdnAddressIpMap); + putMapByHashcode(newDoc, newRelationFqdnAddressIpMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start)); @@ -149,7 +149,7 @@ public class BaseClickhouseData { } void baseRelationshipIpVisitFqdn() { - initializeMap(eIpVisitFqdnMap); + initializeMap(newRelationIpVisitFqdnMap); LOG.info("R_VISIT_IP2FQDN resultMap初始化完成"); String sql = getRelationshipIpVisitFqdnSql(); long start = System.currentTimeMillis(); @@ -159,7 +159,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { BaseEdgeDocument newDoc = getRelationIpVisitFqdnDocument(resultSet); - putMapByHashcode(newDoc,eIpVisitFqdnMap); + putMapByHashcode(newDoc, newRelationIpVisitFqdnMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index e3641a0..b888411 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -1,12 +1,13 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.service.relationship.LocateFqdn2Ip; -import cn.ac.iie.service.relationship.LocateSubscriber2Ip; -import cn.ac.iie.service.relationship.VisitIp2Fqdn; -import cn.ac.iie.service.vertex.Fqdn; -import cn.ac.iie.service.vertex.Ip; -import cn.ac.iie.service.vertex.Subscriber; +import cn.ac.iie.service.update.Document; +import cn.ac.iie.service.update.relationship.LocateFqdn2Ip; +import cn.ac.iie.service.update.relationship.LocateSubscriber2Ip; +import cn.ac.iie.service.update.relationship.VisitIp2Fqdn; +import cn.ac.iie.service.update.vertex.Fqdn; +import cn.ac.iie.service.update.vertex.Ip; +import cn.ac.iie.service.update.vertex.Subscriber; import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.entity.BaseDocument; @@ -18,8 +19,12 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.CountDownLatch; +import static cn.ac.iie.dao.BaseArangoData.*; +import static cn.ac.iie.dao.BaseClickhouseData.*; + /** * 更新图数据库业务类 + * @author wlh */ public class UpdateGraphData { private static final Logger LOG = LoggerFactory.getLogger(UpdateGraphData.class); @@ -30,122 +35,161 @@ public class UpdateGraphData { private CountDownLatch countDownLatch; public void updateArango(){ - long startC = System.currentTimeMillis(); + long start = System.currentTimeMillis(); try { + BaseArangoData baseArangoData = new BaseArangoData(); + + baseArangoData.readHistoryData("FQDN", historyVertexFqdnMap,BaseDocument.class); updateVertexFqdn(); + + baseArangoData.readHistoryData("IP", historyVertexIpMap,BaseDocument.class); updateVertexIp(); - updateRelationFqdnAddressIp(); - updateRelationIpVisitFqdn(); + + baseArangoData.readHistoryData("SUBSCRIBER", historyVertexSubscriberMap,BaseDocument.class); updateVertexSubscriber(); + + baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap,BaseEdgeDocument.class); + updateRelationFqdnAddressIp(); + + baseArangoData.readHistoryData("R_VISIT_IP2FQDN", historyRelationIpVisitFqdnMap,BaseEdgeDocument.class); + updateRelationIpVisitFqdn(); + + baseArangoData.readHistoryData("R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap,BaseEdgeDocument.class); updateRelationshipSubsciberLocateIp(); + + long last = System.currentTimeMillis(); + LOG.info("更新图数据库时间共计:"+(last - start)); }catch (Exception e){ e.printStackTrace(); }finally { arangoManger.clean(); + pool.shutdown(); } - long lastC = System.currentTimeMillis(); - LOG.info("更新ArangoDb时间:"+(lastC - startC)); } private void updateVertexFqdn(){ - baseClickhouseData.baseVertexFqdn(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseVertexFqdn(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> stringArrayListHashMap = BaseClickhouseData.vFqdnMap.get(i); - LOG.info("vFqdn baseDocumentHashMap大小:" + stringArrayListHashMap.size()); - Fqdn updateFqdn = new Fqdn(stringArrayListHashMap, arangoManger, "FQDN", BaseArangoData.v_Fqdn_Map,countDownLatch); - updateFqdn.run(); + HashMap> tmpMap = newVertexFqdnMap.get(i); + Document updateFqdn = new Fqdn(tmpMap, arangoManger, "FQDN", historyVertexFqdnMap,countDownLatch); + pool.executor(updateFqdn); } countDownLatch.await(); - LOG.info("---------FQDN vertex 更新完毕---------"); + long last = System.currentTimeMillis(); + LOG.info("FQDN vertex 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyVertexFqdnMap.clear(); + newVertexFqdnMap.clear(); } } private void updateVertexSubscriber(){ - baseClickhouseData.baseVertexSubscriber(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseVertexSubscriber(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> stringArrayListHashMap = BaseClickhouseData.vSubscriberMap.get(i); - LOG.info("vSubscriber baseDocumentHashMap大小:" + stringArrayListHashMap.size()); - Subscriber updateSubscriber = new Subscriber(stringArrayListHashMap, arangoManger, "SUBSCRIBER", BaseArangoData.v_Subscriber_Map,countDownLatch); - updateSubscriber.run(); + HashMap> tmpMap = newVertexSubscriberMap.get(i); + Subscriber updateSubscriber = new Subscriber(tmpMap, arangoManger, "SUBSCRIBER", historyVertexSubscriberMap,countDownLatch); + pool.executor(updateSubscriber); } countDownLatch.await(); - LOG.info("---------SUBSCRIBER vertex 更新完毕---------"); + long last = System.currentTimeMillis(); + LOG.info("SUBSCRIBER vertex 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyVertexSubscriberMap.clear(); + newVertexSubscriberMap.clear(); } } private void updateRelationshipSubsciberLocateIp(){ - baseClickhouseData.baseRelationshipSubscriberLocateIp(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseRelationshipSubscriberLocateIp(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> baseDocumentHashMap = BaseClickhouseData.eSubsciberLocateIpMap.get(i); - LOG.info("ESubsciberLocateIp baseDocumentHashMap大小:" + baseDocumentHashMap.size()); - LocateSubscriber2Ip locateSubscriber2Ip = new LocateSubscriber2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_SUBSCRIBER2IP", BaseArangoData.e_Subsciber_Locate_Ip_Map, countDownLatch); - locateSubscriber2Ip.run(); + HashMap> tmpMap = newRelationSubsciberLocateIpMap.get(i); + LocateSubscriber2Ip locateSubscriber2Ip = new LocateSubscriber2Ip(tmpMap, arangoManger, "R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap, countDownLatch); + pool.executor(locateSubscriber2Ip); } countDownLatch.await(); - LOG.info("------------R_LOCATE_SUBSCRIBER2IP relationship 更新完毕----------------"); + long last = System.currentTimeMillis(); + LOG.info("R_LOCATE_SUBSCRIBER2IP relationship 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyRelationSubsciberLocateIpMap.clear(); + newRelationSubsciberLocateIpMap.clear(); } } private void updateVertexIp(){ - baseClickhouseData.baseVertexIp(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseVertexIp(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> stringArrayListHashMap = BaseClickhouseData.vIpMap.get(i); - LOG.info("vIp baseDocumentHashMap大小:" + stringArrayListHashMap.size()); - Ip updateIp = new Ip(stringArrayListHashMap, arangoManger, "IP", BaseArangoData.v_Ip_Map, countDownLatch); - updateIp.run(); + HashMap> tmpMap = newVertexIpMap.get(i); + Ip updateIp = new Ip(tmpMap, arangoManger, "IP", historyVertexIpMap, countDownLatch); + pool.executor(updateIp); } countDownLatch.await(); - LOG.info("----------IP vertex 更新完毕-------------"); + long last = System.currentTimeMillis(); + LOG.info("IP vertex 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyVertexIpMap.clear(); + newVertexIpMap.clear(); } } private void updateRelationFqdnAddressIp(){ - baseClickhouseData.baseRelationshipFqdnAddressIp(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseRelationshipFqdnAddressIp(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> baseDocumentHashMap = BaseClickhouseData.eFqdnAddressIpMap.get(i); - LOG.info("EFqdnAddressIp baseDocumentHashMap大小:" + baseDocumentHashMap.size()); - LocateFqdn2Ip fqdnAddressIp = new LocateFqdn2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_FQDN2IP", BaseArangoData.e_Fqdn_Address_Ip_Map, countDownLatch); - fqdnAddressIp.run(); + HashMap> tmpMap = newRelationFqdnAddressIpMap.get(i); + LocateFqdn2Ip fqdnAddressIp = new LocateFqdn2Ip(tmpMap, arangoManger, "R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, countDownLatch); + pool.executor(fqdnAddressIp); } countDownLatch.await(); - LOG.info("------------R_LOCATE_FQDN2IP relationship 更新完毕----------------"); + long last = System.currentTimeMillis(); + LOG.info("R_LOCATE_FQDN2IP relationship 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyRelationFqdnAddressIpMap.clear(); + newRelationFqdnAddressIpMap.clear(); } } private void updateRelationIpVisitFqdn(){ - baseClickhouseData.baseRelationshipIpVisitFqdn(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseRelationshipIpVisitFqdn(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> baseDocumentHashMap = BaseClickhouseData.eIpVisitFqdnMap.get(i); - LOG.info("EIpVisitFqdn baseDocumentHashMap大小:" + baseDocumentHashMap.size()); - VisitIp2Fqdn ipVisitFqdn = new VisitIp2Fqdn(baseDocumentHashMap,arangoManger,"R_VISIT_IP2FQDN",BaseArangoData.e_Ip_Visit_Fqdn_Map,countDownLatch); - ipVisitFqdn.run(); + HashMap> tmpMap = newRelationIpVisitFqdnMap.get(i); + VisitIp2Fqdn ipVisitFqdn = new VisitIp2Fqdn(tmpMap,arangoManger,"R_VISIT_IP2FQDN", historyRelationIpVisitFqdnMap,countDownLatch); + pool.executor(ipVisitFqdn); } countDownLatch.await(); - LOG.info("---------------R_VISIT_IP2FQDN ralationship 更新完毕----------------"); + long last = System.currentTimeMillis(); + LOG.info("R_VISIT_IP2FQDN ralationship 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyRelationIpVisitFqdnMap.clear(); + newRelationIpVisitFqdnMap.clear(); } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java index 7b11692..bf36728 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java @@ -29,7 +29,7 @@ public class UpdateEFqdnAddressIp implements Runnable { BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); if (newEdgeDocument != null){ i += 1; - BaseEdgeDocument edgeDocument = BaseArangoData.e_Fqdn_Address_Ip_Map.getOrDefault(key, null); + BaseEdgeDocument edgeDocument = BaseArangoData.historyRelationFqdnAddressIpMap.getOrDefault(key, null); if (edgeDocument != null){ Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); long countTotal = Long.parseLong(newEdgeDocument.getAttribute("COUNT_TOTAL").toString()); diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java index 7a0ddf2..092e794 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java @@ -29,7 +29,7 @@ public class UpdateEIpVisitFqdn implements Runnable { BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); if (newEdgeDocument != null){ i += 1; - BaseEdgeDocument edgeDocument = BaseArangoData.e_Ip_Visit_Fqdn_Map.getOrDefault(key, null); + BaseEdgeDocument edgeDocument = BaseArangoData.historyRelationIpVisitFqdnMap.getOrDefault(key, null); if (edgeDocument != null){ Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); long countTotal = Long.parseLong(newEdgeDocument.getAttribute("COUNT_TOTAL").toString()); diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java index c49aec4..f14b69a 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java @@ -33,7 +33,7 @@ public class UpdateVFqdn implements Runnable{ if (newDocument != null){ i += 1; - BaseDocument document = BaseArangoData.v_Fqdn_Map.getOrDefault(key, null); + BaseDocument document = BaseArangoData.historyVertexFqdnMap.getOrDefault(key, null); if (document != null){ Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); long fqdnCountTotal = Long.parseLong(newDocument.getAttribute("FQDN_COUNT_TOTAL").toString()); diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java index 08fff78..3b83769 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java @@ -31,7 +31,7 @@ public class UpdateVIP implements Runnable { BaseDocument newDocument = documentHashMap.getOrDefault(key, null); if (newDocument != null){ i += 1; - BaseDocument document = BaseArangoData.v_Ip_Map.getOrDefault(key, null); + BaseDocument document = BaseArangoData.historyVertexIpMap.getOrDefault(key, null); if (document != null){ Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); long ipCountTotal = Long.parseLong(newDocument.getAttribute("IP_COUNT_TOTAL").toString()); diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index e0aec45..b5f4619 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; /** * @author wlh @@ -22,38 +23,46 @@ public class ReadHistoryArangoData extends Thread { private ConcurrentHashMap map; private Class type; private String table; + private CountDownLatch countDownLatch; - public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, ConcurrentHashMap map, Class type, String table) { + public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, ConcurrentHashMap map, Class type, String table,CountDownLatch countDownLatch) { this.arangoConnect = arangoConnect; this.query = query; this.map = map; this.type = type; this.table = table; + this.countDownLatch = countDownLatch; } @Override public void run() { - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoConnect.executorQuery(query, type); - if (docs != null) { - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (T doc : baseDocuments) { - String key = doc.getKey(); - switch (table) { - case "R_LOCATE_FQDN2IP": - updateProtocolDocument(doc); - break; - case "R_VISIT_IP2FQDN": - updateProtocolDocument(doc); - break; - default: + try { + long s = System.currentTimeMillis(); + ArangoCursor docs = arangoConnect.executorQuery(query, type); + if (docs != null) { + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (T doc : baseDocuments) { + String key = doc.getKey(); + switch (table) { + case "R_LOCATE_FQDN2IP": + updateProtocolDocument(doc); + break; + case "R_VISIT_IP2FQDN": + updateProtocolDocument(doc); + break; + default: + } + map.put(key, doc); + i++; } - map.put(key, doc); - i++; + long l = System.currentTimeMillis(); + LOG.info(query + "\n读取数据" + i + "条,运行时间:" + (l - s)); } - long l = System.currentTimeMillis(); - LOG.info(query + "\n处理数据" + i + "条,运行时间:" + (l - s)); + }catch (Exception e){ + e.printStackTrace(); + }finally { + countDownLatch.countDown(); } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java index 1c9203d..928ed87 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java @@ -39,6 +39,7 @@ public class Document extends Thread{ @Override public void run() { + LOG.info(collectionName+" new Map 大小:"+newDocumentMap.size()); Set keySet = newDocumentMap.keySet(); ArrayList resultDocumentList = new ArrayList<>(); int i = 0; diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java similarity index 98% rename from ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java index 0515381..373e8d0 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java @@ -1,4 +1,4 @@ -package cn.ac.iie.service.relationship; +package cn.ac.iie.service.update.relationship; import cn.ac.iie.service.read.ReadClickhouseData; import cn.ac.iie.service.update.Relationship; diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateSubscriber2Ip.java similarity index 94% rename from ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateSubscriber2Ip.java index d5e60b9..5ca4cb0 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateSubscriber2Ip.java @@ -1,4 +1,4 @@ -package cn.ac.iie.service.relationship; +package cn.ac.iie.service.update.relationship; import cn.ac.iie.service.update.Relationship; import cn.ac.iie.utils.ArangoDBConnect; diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java similarity index 96% rename from ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java index f6df715..6565d84 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java @@ -1,4 +1,4 @@ -package cn.ac.iie.service.relationship; +package cn.ac.iie.service.update.relationship; import cn.ac.iie.service.update.Relationship; import cn.ac.iie.utils.ArangoDBConnect; diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Fqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Fqdn.java deleted file mode 100644 index 976925a..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Fqdn.java +++ /dev/null @@ -1,21 +0,0 @@ -package cn.ac.iie.service.vertex; - -import cn.ac.iie.service.update.Vertex; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -public class Fqdn extends Vertex { - - public Fqdn(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, - String collectionName, - ConcurrentHashMap historyDocumentMap, - CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Ip.java deleted file mode 100644 index 001b993..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Ip.java +++ /dev/null @@ -1,79 +0,0 @@ -package cn.ac.iie.service.vertex; - -import cn.ac.iie.service.update.Vertex; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -public class Ip extends Vertex { - - public Ip(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, - String collectionName, - ConcurrentHashMap historyDocumentMap, - CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); - } - - @Override - protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) { - super.updateFunction(newDocument, historyDocument); - updateIpByType(newDocument, historyDocument); - } - - @Override - protected void mergeFunction(Map properties, BaseDocument doc) { - super.mergeFunction(properties, doc); - mergeIpByType(properties,doc); - } - - private void mergeIpByType(Map properties, BaseDocument doc){ - Map mergeProperties = doc.getProperties(); - checkIpTypeProperty(properties,mergeProperties,"CLIENT_SESSION_COUNT"); - checkIpTypeProperty(properties,mergeProperties,"CLIENT_BYTES_SUM"); - checkIpTypeProperty(properties,mergeProperties,"SERVER_SESSION_COUNT"); - checkIpTypeProperty(properties,mergeProperties,"SERVER_BYTES_SUM"); - } - - private void checkIpTypeProperty(Map properties,Map mergeProperties,String property){ - try { - if (!properties.containsKey(property)){ - properties.put(property,0L); - checkIpTypeProperty(properties,mergeProperties,property); - }else if ("0".equals(properties.get(property).toString()) && mergeProperties.containsKey(property)){ - if (!"0".equals(mergeProperties.get(property).toString())){ - properties.put(property,Long.parseLong(mergeProperties.get(property).toString())); - } - } - }catch (Exception e){ - e.printStackTrace(); - } - } - - private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument){ - addProperty(newDocument,historyDocument,"CLIENT_SESSION_COUNT"); - addProperty(newDocument,historyDocument,"CLIENT_BYTES_SUM"); - addProperty(newDocument,historyDocument,"SERVER_SESSION_COUNT"); - addProperty(newDocument,historyDocument,"SERVER_BYTES_SUM"); - } - - private void addProperty(BaseDocument newDocument, BaseDocument historyDocument,String property){ - try { - if (historyDocument.getProperties().containsKey(property)){ - long newProperty = Long.parseLong(newDocument.getAttribute(property).toString()); - long hisProperty = Long.parseLong(historyDocument.getAttribute(property).toString()); - historyDocument.updateAttribute(property,newProperty+hisProperty); - }else { - historyDocument.addAttribute(property,0L); - } - }catch (Exception e){ - e.printStackTrace(); - } - } - -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Subscriber.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Subscriber.java deleted file mode 100644 index 8689980..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Subscriber.java +++ /dev/null @@ -1,21 +0,0 @@ -package cn.ac.iie.service.vertex; - -import cn.ac.iie.service.update.Vertex; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -public class Subscriber extends Vertex { - - public Subscriber(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, - String collectionName, - ConcurrentHashMap historyDocumentMap, - CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java index e29202d..a56f097 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java @@ -1,25 +1,18 @@ package cn.ac.iie.test; -import cn.ac.iie.dao.BaseArangoData; import cn.ac.iie.dao.UpdateGraphData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +/** + * iplearning程序入口 + * @author wlh + */ public class IpLearningApplicationTest { - private static final Logger LOG = LoggerFactory.getLogger(IpLearningApplicationTest.class); public static void main(String[] args) { - long start = System.currentTimeMillis(); - LOG.info("Ip Learning Application开始运行"); - BaseArangoData baseArangoData = new BaseArangoData(); - baseArangoData.baseDocumentDataMap(); - LOG.info("历史数据读取完成,开始更新数据"); UpdateGraphData updateGraphData = new UpdateGraphData(); updateGraphData.updateArango(); - long last = System.currentTimeMillis(); - LOG.info("共计运行时间:"+(last - start)); } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index b2ce9ba..e0de171 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -20,10 +20,10 @@ public class ArangoDBConnect { private static ArangoDB arangoDB = null; private static ArangoDBConnect conn = null; static { - getArangoDB(); + getArangoDatabase(); } - private static void getArangoDB(){ + private static void getArangoDatabase(){ arangoDB = new ArangoDB.Builder() .maxConnections(ApplicationConfig.THREAD_POOL_NUMBER) .host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT) @@ -39,7 +39,7 @@ public class ArangoDBConnect { return conn; } - public ArangoDatabase getDatabase(){ + private ArangoDatabase getDatabase(){ return arangoDB.db(ApplicationConfig.ARANGODB_DB_NAME); } diff --git a/ip-learning-java-test/src/main/resources/application.properties b/ip-learning-java-test/src/main/resources/application.properties index 61aca8d..313d233 100644 --- a/ip-learning-java-test/src/main/resources/application.properties +++ b/ip-learning-java-test/src/main/resources/application.properties @@ -1,9 +1,9 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.127 +arangoDB.host=192.168.40.182 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111 -arangoDB.DB.name=ip-learning-test +arangoDB.DB.name=ip-learning-test-0 arangoDB.batch=100000 arangoDB.ttl=3600