From 4df142cdf696a5fefe0e8b30cbcce5b3332c00a7 Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Mon, 20 Jul 2020 19:36:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=A4=84=E7=90=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E6=8C=89=E7=85=A7=E6=96=87=E6=A1=A3=E5=88=86?= =?UTF-8?q?=E5=88=AB=E5=A4=84=E7=90=86=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/cn/ac/iie/dao/BaseArangoData.java | 51 +++---- .../cn/ac/iie/dao/BaseClickhouseData.java | 38 ++--- .../java/cn/ac/iie/dao/UpdateGraphData.java | 134 ++++++++++++------ .../ReadClickhouseData.java | 2 +- .../ReadHistoryArangoData.java | 55 ++++--- .../ac/iie/service/update/Relationship.java | 2 +- .../update/relationship/LocateFqdn2Ip.java | 2 +- .../update/relationship/VisitIp2Fqdn.java | 2 +- .../iie/test/IpLearningApplicationTest.java | 13 -- .../src/main/resources/application.properties | 2 +- 10 files changed, 166 insertions(+), 135 deletions(-) rename IP-learning-graph/src/main/java/cn/ac/iie/service/{read => ingestion}/ReadClickhouseData.java (99%) rename IP-learning-graph/src/main/java/cn/ac/iie/service/{read => ingestion}/ReadHistoryArangoData.java (56%) diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index e4e77ca..15d918f 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -1,7 +1,7 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.service.read.ReadHistoryArangoData; +import cn.ac.iie.service.ingestion.ReadHistoryArangoData; import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.ArangoCursor; @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; /** * 获取arangoDB历史数据 @@ -18,45 +19,31 @@ import java.util.concurrent.ConcurrentHashMap; public class BaseArangoData { private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); - static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); - static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); - static ConcurrentHashMap v_Subscriber_Map = new ConcurrentHashMap<>(); - static ConcurrentHashMap e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>(); - static ConcurrentHashMap e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>(); - static ConcurrentHashMap e_Subsciber_Locate_Ip_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyVertexFqdnMap = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyVertexIpMap = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyVertexSubscriberMap = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>(); private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); - private static ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); + private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); - public void baseDocumentDataMap(){ - long startA = System.currentTimeMillis(); - readHistoryData("FQDN", v_Fqdn_Map,BaseDocument.class); - readHistoryData("IP", v_Ip_Map,BaseDocument.class); - readHistoryData("SUBSCRIBER",v_Subscriber_Map,BaseDocument.class); - readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map,BaseEdgeDocument.class); -// readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map,BaseEdgeDocument.class); - readHistoryData("R_LOCATE_SUBSCRIBER2IP",e_Subsciber_Locate_Ip_Map,BaseEdgeDocument.class); - threadPool.shutdown(); - threadPool.awaitThreadTask(); - LOG.info("v_Fqdn_Map大小:"+v_Fqdn_Map.size()); - LOG.info("v_Ip_Map大小:"+v_Ip_Map.size()); - LOG.info("v_Subscriber_Map大小:"+v_Subscriber_Map.size()); - LOG.info("e_Fqdn_Address_Ip_Map大小:"+e_Fqdn_Address_Ip_Map.size()); - LOG.info("e_Ip_Visit_Fqdn_Map大小:"+e_Ip_Visit_Fqdn_Map.size()); - LOG.info("e_Subsciber_Locate_Ip_Map大小:"+e_Subsciber_Locate_Ip_Map.size()); - long lastA = System.currentTimeMillis(); - LOG.info("读取ArangoDb时间:"+(lastA - startA)); - } - - private void readHistoryData(String collectionName, ConcurrentHashMap map, Class type){ + void readHistoryData(String table, ConcurrentHashMap map, Class type){ try { - long[] timeRange = getTimeRange(collectionName); + long start = System.currentTimeMillis(); + CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + long[] timeRange = getTimeRange(table); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - String sql = getQuerySql(timeRange, i, collectionName); - ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,collectionName); + String sql = getQuerySql(timeRange, i, table); + ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch); threadPool.executor(readHistoryArangoData); } + countDownLatch.await(); + long last = System.currentTimeMillis(); + LOG.info("读取"+table+" arangoDB 共耗时:"+(last-start)); + LOG.info(table+" history Map大小为:"+map.size()); }catch (Exception e){ e.printStackTrace(); } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index b33b73c..fd98cab 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -13,7 +13,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; -import static cn.ac.iie.service.read.ReadClickhouseData.*; +import static cn.ac.iie.service.ingestion.ReadClickhouseData.*; /** * 读取clickhouse数据,封装到map @@ -23,18 +23,18 @@ public class BaseClickhouseData { private static final Logger LOG = LoggerFactory.getLogger(BaseClickhouseData.class); private static ClickhouseConnect manger = ClickhouseConnect.getInstance(); - static HashMap>> vFqdnMap = new HashMap<>(); - static HashMap>> vIpMap = new HashMap<>(); - static HashMap>> vSubscriberMap = new HashMap<>(); - static HashMap>> eFqdnAddressIpMap = new HashMap<>(); - static HashMap>> eIpVisitFqdnMap = new HashMap<>(); - static HashMap>> eSubsciberLocateIpMap = new HashMap<>(); + static HashMap>> newVertexFqdnMap = new HashMap<>(); + static HashMap>> newVertexIpMap = new HashMap<>(); + static HashMap>> newVertexSubscriberMap = new HashMap<>(); + static HashMap>> newRelationFqdnAddressIpMap = new HashMap<>(); + static HashMap>> newRelationIpVisitFqdnMap = new HashMap<>(); + static HashMap>> newRelationSubsciberLocateIpMap = new HashMap<>(); private DruidPooledConnection connection; private Statement statement; void baseVertexFqdn() { - initializeMap(vFqdnMap); + initializeMap(newVertexFqdnMap); LOG.info("FQDN resultMap初始化完成"); String sql = getVertexFqdnSql(); long start = System.currentTimeMillis(); @@ -45,7 +45,7 @@ public class BaseClickhouseData { while (resultSet.next()) { BaseDocument newDoc = getVertexFqdnDocument(resultSet); if (newDoc != null) { - putMapByHashcode(newDoc,vFqdnMap); + putMapByHashcode(newDoc, newVertexFqdnMap); } } long last = System.currentTimeMillis(); @@ -58,7 +58,7 @@ public class BaseClickhouseData { } void baseVertexIp() { - initializeMap(vIpMap); + initializeMap(newVertexIpMap); LOG.info("IP resultMap初始化完成"); String sql = getVertexIpSql(); long start = System.currentTimeMillis(); @@ -68,7 +68,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { BaseDocument newDoc = getVertexIpDocument(resultSet); - putMapByHashcode(newDoc,vIpMap); + putMapByHashcode(newDoc, newVertexIpMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse v_IP时间:" + (last - start)); @@ -80,7 +80,7 @@ public class BaseClickhouseData { } void baseVertexSubscriber(){ - initializeMap(vSubscriberMap); + initializeMap(newVertexSubscriberMap); LOG.info("SUBSCRIBER resultMap初始化完成"); String sql = getVertexSubscriberSql(); long start = System.currentTimeMillis(); @@ -90,7 +90,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()){ BaseDocument newDoc = getVertexSubscriberDocument(resultSet); - putMapByHashcode(newDoc,vSubscriberMap); + putMapByHashcode(newDoc, newVertexSubscriberMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse v_SUBSCRIBER时间:" + (last - start)); @@ -103,7 +103,7 @@ public class BaseClickhouseData { } void baseRelationshipSubscriberLocateIp(){ - initializeMap(eSubsciberLocateIpMap); + initializeMap(newRelationSubsciberLocateIpMap); LOG.info("R_LOCATE_SUBSCRIBER2IP"); String sql = getRelationshipSubsciberLocateIpSql(); long start = System.currentTimeMillis(); @@ -113,7 +113,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()){ BaseEdgeDocument newDoc = getRelationshipSubsciberLocateIpDocument(resultSet); - putMapByHashcode(newDoc,eSubsciberLocateIpMap); + putMapByHashcode(newDoc, newRelationSubsciberLocateIpMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse ESubsciberLocateIp时间:" + (last - start)); @@ -126,7 +126,7 @@ public class BaseClickhouseData { } void baseRelationshipFqdnAddressIp() { - initializeMap(eFqdnAddressIpMap); + initializeMap(newRelationFqdnAddressIpMap); LOG.info("R_LOCATE_FQDN2IP resultMap初始化完成"); String sql = getRelationshipFqdnAddressIpSql(); long start = System.currentTimeMillis(); @@ -137,7 +137,7 @@ public class BaseClickhouseData { while (resultSet.next()) { BaseEdgeDocument newDoc = getRelationFqdnAddressIpDocument(resultSet); - putMapByHashcode(newDoc,eFqdnAddressIpMap); + putMapByHashcode(newDoc, newRelationFqdnAddressIpMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start)); @@ -149,7 +149,7 @@ public class BaseClickhouseData { } void baseRelationshipIpVisitFqdn() { - initializeMap(eIpVisitFqdnMap); + initializeMap(newRelationIpVisitFqdnMap); LOG.info("R_VISIT_IP2FQDN resultMap初始化完成"); String sql = getRelationshipIpVisitFqdnSql(); long start = System.currentTimeMillis(); @@ -159,7 +159,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { BaseEdgeDocument newDoc = getRelationIpVisitFqdnDocument(resultSet); - putMapByHashcode(newDoc,eIpVisitFqdnMap); + putMapByHashcode(newDoc, newRelationIpVisitFqdnMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index 7eff3e6..b888411 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -1,8 +1,13 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.service.update.vertex.*; -import cn.ac.iie.service.update.relationship.*; +import cn.ac.iie.service.update.Document; +import cn.ac.iie.service.update.relationship.LocateFqdn2Ip; +import cn.ac.iie.service.update.relationship.LocateSubscriber2Ip; +import cn.ac.iie.service.update.relationship.VisitIp2Fqdn; +import cn.ac.iie.service.update.vertex.Fqdn; +import cn.ac.iie.service.update.vertex.Ip; +import cn.ac.iie.service.update.vertex.Subscriber; import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.entity.BaseDocument; @@ -14,8 +19,12 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.CountDownLatch; +import static cn.ac.iie.dao.BaseArangoData.*; +import static cn.ac.iie.dao.BaseClickhouseData.*; + /** * 更新图数据库业务类 + * @author wlh */ public class UpdateGraphData { private static final Logger LOG = LoggerFactory.getLogger(UpdateGraphData.class); @@ -26,122 +35,161 @@ public class UpdateGraphData { private CountDownLatch countDownLatch; public void updateArango(){ - long startC = System.currentTimeMillis(); + long start = System.currentTimeMillis(); try { + BaseArangoData baseArangoData = new BaseArangoData(); + + baseArangoData.readHistoryData("FQDN", historyVertexFqdnMap,BaseDocument.class); updateVertexFqdn(); + + baseArangoData.readHistoryData("IP", historyVertexIpMap,BaseDocument.class); updateVertexIp(); - updateRelationFqdnAddressIp(); -// updateRelationIpVisitFqdn(); + + baseArangoData.readHistoryData("SUBSCRIBER", historyVertexSubscriberMap,BaseDocument.class); updateVertexSubscriber(); + + baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap,BaseEdgeDocument.class); + updateRelationFqdnAddressIp(); + + baseArangoData.readHistoryData("R_VISIT_IP2FQDN", historyRelationIpVisitFqdnMap,BaseEdgeDocument.class); + updateRelationIpVisitFqdn(); + + baseArangoData.readHistoryData("R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap,BaseEdgeDocument.class); updateRelationshipSubsciberLocateIp(); + + long last = System.currentTimeMillis(); + LOG.info("更新图数据库时间共计:"+(last - start)); }catch (Exception e){ e.printStackTrace(); }finally { arangoManger.clean(); + pool.shutdown(); } - long lastC = System.currentTimeMillis(); - LOG.info("更新ArangoDb时间:"+(lastC - startC)); } private void updateVertexFqdn(){ - baseClickhouseData.baseVertexFqdn(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseVertexFqdn(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> stringArrayListHashMap = BaseClickhouseData.vFqdnMap.get(i); - LOG.info("vFqdn baseDocumentHashMap大小:" + stringArrayListHashMap.size()); - Fqdn updateFqdn = new Fqdn(stringArrayListHashMap, arangoManger, "FQDN", BaseArangoData.v_Fqdn_Map,countDownLatch); - updateFqdn.run(); + HashMap> tmpMap = newVertexFqdnMap.get(i); + Document updateFqdn = new Fqdn(tmpMap, arangoManger, "FQDN", historyVertexFqdnMap,countDownLatch); + pool.executor(updateFqdn); } countDownLatch.await(); - LOG.info("---------FQDN vertex 更新完毕---------"); + long last = System.currentTimeMillis(); + LOG.info("FQDN vertex 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyVertexFqdnMap.clear(); + newVertexFqdnMap.clear(); } } private void updateVertexSubscriber(){ - baseClickhouseData.baseVertexSubscriber(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseVertexSubscriber(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> stringArrayListHashMap = BaseClickhouseData.vSubscriberMap.get(i); - LOG.info("vSubscriber baseDocumentHashMap大小:" + stringArrayListHashMap.size()); - Subscriber updateSubscriber = new Subscriber(stringArrayListHashMap, arangoManger, "SUBSCRIBER", BaseArangoData.v_Subscriber_Map,countDownLatch); - updateSubscriber.run(); + HashMap> tmpMap = newVertexSubscriberMap.get(i); + Subscriber updateSubscriber = new Subscriber(tmpMap, arangoManger, "SUBSCRIBER", historyVertexSubscriberMap,countDownLatch); + pool.executor(updateSubscriber); } countDownLatch.await(); - LOG.info("---------SUBSCRIBER vertex 更新完毕---------"); + long last = System.currentTimeMillis(); + LOG.info("SUBSCRIBER vertex 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyVertexSubscriberMap.clear(); + newVertexSubscriberMap.clear(); } } private void updateRelationshipSubsciberLocateIp(){ - baseClickhouseData.baseRelationshipSubscriberLocateIp(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseRelationshipSubscriberLocateIp(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> baseDocumentHashMap = BaseClickhouseData.eSubsciberLocateIpMap.get(i); - LOG.info("ESubsciberLocateIp baseDocumentHashMap大小:" + baseDocumentHashMap.size()); - LocateSubscriber2Ip locateSubscriber2Ip = new LocateSubscriber2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_SUBSCRIBER2IP", BaseArangoData.e_Subsciber_Locate_Ip_Map, countDownLatch); - locateSubscriber2Ip.run(); + HashMap> tmpMap = newRelationSubsciberLocateIpMap.get(i); + LocateSubscriber2Ip locateSubscriber2Ip = new LocateSubscriber2Ip(tmpMap, arangoManger, "R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap, countDownLatch); + pool.executor(locateSubscriber2Ip); } countDownLatch.await(); - LOG.info("------------R_LOCATE_SUBSCRIBER2IP relationship 更新完毕----------------"); + long last = System.currentTimeMillis(); + LOG.info("R_LOCATE_SUBSCRIBER2IP relationship 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyRelationSubsciberLocateIpMap.clear(); + newRelationSubsciberLocateIpMap.clear(); } } private void updateVertexIp(){ - baseClickhouseData.baseVertexIp(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseVertexIp(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> stringArrayListHashMap = BaseClickhouseData.vIpMap.get(i); - LOG.info("vIp baseDocumentHashMap大小:" + stringArrayListHashMap.size()); - Ip updateIp = new Ip(stringArrayListHashMap, arangoManger, "IP", BaseArangoData.v_Ip_Map, countDownLatch); - updateIp.run(); + HashMap> tmpMap = newVertexIpMap.get(i); + Ip updateIp = new Ip(tmpMap, arangoManger, "IP", historyVertexIpMap, countDownLatch); + pool.executor(updateIp); } countDownLatch.await(); - LOG.info("----------IP vertex 更新完毕-------------"); + long last = System.currentTimeMillis(); + LOG.info("IP vertex 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyVertexIpMap.clear(); + newVertexIpMap.clear(); } } private void updateRelationFqdnAddressIp(){ - baseClickhouseData.baseRelationshipFqdnAddressIp(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseRelationshipFqdnAddressIp(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> baseDocumentHashMap = BaseClickhouseData.eFqdnAddressIpMap.get(i); - LOG.info("EFqdnAddressIp baseDocumentHashMap大小:" + baseDocumentHashMap.size()); - LocateFqdn2Ip fqdnAddressIp = new LocateFqdn2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_FQDN2IP", BaseArangoData.e_Fqdn_Address_Ip_Map, countDownLatch); - fqdnAddressIp.run(); + HashMap> tmpMap = newRelationFqdnAddressIpMap.get(i); + LocateFqdn2Ip fqdnAddressIp = new LocateFqdn2Ip(tmpMap, arangoManger, "R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, countDownLatch); + pool.executor(fqdnAddressIp); } countDownLatch.await(); - LOG.info("------------R_LOCATE_FQDN2IP relationship 更新完毕----------------"); + long last = System.currentTimeMillis(); + LOG.info("R_LOCATE_FQDN2IP relationship 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyRelationFqdnAddressIpMap.clear(); + newRelationFqdnAddressIpMap.clear(); } } private void updateRelationIpVisitFqdn(){ - baseClickhouseData.baseRelationshipIpVisitFqdn(); try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseRelationshipIpVisitFqdn(); countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> baseDocumentHashMap = BaseClickhouseData.eIpVisitFqdnMap.get(i); - LOG.info("EIpVisitFqdn baseDocumentHashMap大小:" + baseDocumentHashMap.size()); - VisitIp2Fqdn ipVisitFqdn = new VisitIp2Fqdn(baseDocumentHashMap,arangoManger,"R_VISIT_IP2FQDN", BaseArangoData.e_Ip_Visit_Fqdn_Map,countDownLatch); - ipVisitFqdn.run(); + HashMap> tmpMap = newRelationIpVisitFqdnMap.get(i); + VisitIp2Fqdn ipVisitFqdn = new VisitIp2Fqdn(tmpMap,arangoManger,"R_VISIT_IP2FQDN", historyRelationIpVisitFqdnMap,countDownLatch); + pool.executor(ipVisitFqdn); } countDownLatch.await(); - LOG.info("---------------R_VISIT_IP2FQDN ralationship 更新完毕----------------"); + long last = System.currentTimeMillis(); + LOG.info("R_VISIT_IP2FQDN ralationship 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); + }finally { + historyRelationIpVisitFqdnMap.clear(); + newRelationIpVisitFqdnMap.clear(); } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java similarity index 99% rename from IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java index 2e24e5c..275491c 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java @@ -1,4 +1,4 @@ -package cn.ac.iie.service.read; +package cn.ac.iie.service.ingestion; import cn.ac.iie.config.ApplicationConfig; import com.arangodb.entity.BaseDocument; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java similarity index 56% rename from IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java index 1098b24..03baf7f 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java @@ -1,4 +1,4 @@ -package cn.ac.iie.service.read; +package cn.ac.iie.service.ingestion; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.ArangoCursor; @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; /** * @author wlh @@ -21,39 +22,47 @@ public class ReadHistoryArangoData extends Thread { private String query; private ConcurrentHashMap map; private Class type; - private String collectionName; + private String table; + private CountDownLatch countDownLatch; - public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, ConcurrentHashMap map, Class type, String collectionName) { + public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, ConcurrentHashMap map, Class type, String table, CountDownLatch countDownLatch) { this.arangoConnect = arangoConnect; this.query = query; this.map = map; this.type = type; - this.collectionName = collectionName; + this.table = table; + this.countDownLatch = countDownLatch; } @Override public void run() { - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoConnect.executorQuery(query, type); - if (docs != null) { - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (T doc : baseDocuments) { - String key = doc.getKey(); - switch (collectionName) { - case "R_LOCATE_FQDN2IP": - updateProtocolDocument(doc); - break; - case "R_VISIT_IP2FQDN": - updateProtocolDocument(doc); - break; - default: + try { + long s = System.currentTimeMillis(); + ArangoCursor docs = arangoConnect.executorQuery(query, type); + if (docs != null) { + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (T doc : baseDocuments) { + String key = doc.getKey(); + switch (table) { + case "R_LOCATE_FQDN2IP": + updateProtocolDocument(doc); + break; + case "R_VISIT_IP2FQDN": + updateProtocolDocument(doc); + break; + default: + } + map.put(key, doc); + i++; } - map.put(key, doc); - i++; + long l = System.currentTimeMillis(); + LOG.info(query + "\n读取数据" + i + "条,运行时间:" + (l - s)); } - long l = System.currentTimeMillis(); - LOG.info(query + "\n处理数据" + i + "条,运行时间:" + (l - s)); + }catch (Exception e){ + e.printStackTrace(); + }finally { + countDownLatch.countDown(); } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java index 29e6ec2..4047478 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java @@ -1,6 +1,6 @@ package cn.ac.iie.service.update; -import cn.ac.iie.service.read.ReadClickhouseData; +import cn.ac.iie.service.ingestion.ReadClickhouseData; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java index 9f95b51..21904cc 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java @@ -1,6 +1,6 @@ package cn.ac.iie.service.update.relationship; -import cn.ac.iie.service.read.ReadClickhouseData; +import cn.ac.iie.service.ingestion.ReadClickhouseData; import cn.ac.iie.service.update.Relationship; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java index f283d84..431927f 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java @@ -1,6 +1,6 @@ package cn.ac.iie.service.update.relationship; -import cn.ac.iie.service.read.ReadClickhouseData; +import cn.ac.iie.service.ingestion.ReadClickhouseData; import cn.ac.iie.service.update.Relationship; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java index e29202d..1165eee 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java @@ -1,25 +1,12 @@ package cn.ac.iie.test; -import cn.ac.iie.dao.BaseArangoData; import cn.ac.iie.dao.UpdateGraphData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class IpLearningApplicationTest { - private static final Logger LOG = LoggerFactory.getLogger(IpLearningApplicationTest.class); public static void main(String[] args) { - long start = System.currentTimeMillis(); - LOG.info("Ip Learning Application开始运行"); - BaseArangoData baseArangoData = new BaseArangoData(); - baseArangoData.baseDocumentDataMap(); - - LOG.info("历史数据读取完成,开始更新数据"); UpdateGraphData updateGraphData = new UpdateGraphData(); updateGraphData.updateArango(); - long last = System.currentTimeMillis(); - LOG.info("共计运行时间:"+(last - start)); } } diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index 1d2be99..e7f5186 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -1,5 +1,5 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.182 +arangoDB.host=192.168.40.127 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111