From a6d212aa372b22e521da3350136e72a6c01ab49b Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Fri, 17 Jul 2020 19:28:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0PROTOCOL=5FTYPE=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E8=BE=85=E5=8A=A9=E8=AE=A1=E7=AE=97=EF=BC=8C=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=AE=A1=E7=AE=97=E9=80=BB=E8=BE=91=EF=BC=8C=E7=BB=B4?= =?UTF-8?q?=E6=8A=A424=E5=B0=8F=E6=97=B6=E5=86=85=E5=8D=8F=E8=AE=AE?= =?UTF-8?q?=E5=87=BA=E7=8E=B0=E6=AC=A1=E6=95=B0=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/cn/ac/iie/dao/BaseArangoData.java | 49 ++--- .../cn/ac/iie/dao/BaseClickhouseData.java | 64 +++--- .../java/cn/ac/iie/dao/UpdateGraphData.java | 57 +++--- .../iie/service/read/ReadClickhouseData.java | 82 +++++--- .../service/read/ReadHistoryArangoData.java | 51 ++++- .../relationship/LocateSubscriber2Ip.java | 38 ---- .../service/relationship/VisitIp2Fqdn.java | 19 -- .../cn/ac/iie/service/update/Document.java | 118 +++++++++++ .../ac/iie/service/update/Relationship.java | 193 +++++------------- .../java/cn/ac/iie/service/update/Vertex.java | 117 ++--------- .../relationship/LocateFqdn2Ip.java | 38 +--- .../relationship/LocateSubscriber2Ip.java | 21 ++ .../update/relationship/VisitIp2Fqdn.java | 36 ++++ .../iie/service/{ => update}/vertex/Fqdn.java | 5 +- .../iie/service/{ => update}/vertex/Ip.java | 21 +- .../{ => update}/vertex/Subscriber.java | 5 +- .../java/cn/ac/iie/utils/ArangoDBConnect.java | 1 - .../src/main/resources/application.properties | 7 +- 18 files changed, 426 insertions(+), 496 deletions(-) delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java rename IP-learning-graph/src/main/java/cn/ac/iie/service/{ => update}/relationship/LocateFqdn2Ip.java (66%) create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateSubscriber2Ip.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java rename IP-learning-graph/src/main/java/cn/ac/iie/service/{ => update}/vertex/Fqdn.java (79%) rename IP-learning-graph/src/main/java/cn/ac/iie/service/{ => update}/vertex/Ip.java (89%) rename IP-learning-graph/src/main/java/cn/ac/iie/service/{ => update}/vertex/Subscriber.java (79%) diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index 45cebee..e4e77ca 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -10,8 +10,6 @@ import com.arangodb.entity.BaseEdgeDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Enumeration; import java.util.concurrent.ConcurrentHashMap; /** @@ -20,25 +18,25 @@ import java.util.concurrent.ConcurrentHashMap; public class BaseArangoData { private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); - static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); - static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); - static ConcurrentHashMap v_Subscriber_Map = new ConcurrentHashMap<>(); + static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); + static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); + static ConcurrentHashMap v_Subscriber_Map = new ConcurrentHashMap<>(); static ConcurrentHashMap e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>(); static ConcurrentHashMap e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>(); static ConcurrentHashMap e_Subsciber_Locate_Ip_Map = new ConcurrentHashMap<>(); - private static final ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); + private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); - private static final ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); + private static ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); public void baseDocumentDataMap(){ long startA = System.currentTimeMillis(); - readHistoryData("FQDN", v_Fqdn_Map); - readHistoryData("IP", v_Ip_Map); - readHistoryData("SUBSCRIBER",v_Subscriber_Map); - readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map); -// readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map); - readHistoryData("R_LOCATE_SUBSCRIBER2IP",e_Subsciber_Locate_Ip_Map); + readHistoryData("FQDN", v_Fqdn_Map,BaseDocument.class); + readHistoryData("IP", v_Ip_Map,BaseDocument.class); + readHistoryData("SUBSCRIBER",v_Subscriber_Map,BaseDocument.class); + readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map,BaseEdgeDocument.class); +// readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map,BaseEdgeDocument.class); + readHistoryData("R_LOCATE_SUBSCRIBER2IP",e_Subsciber_Locate_Ip_Map,BaseEdgeDocument.class); threadPool.shutdown(); threadPool.awaitThreadTask(); LOG.info("v_Fqdn_Map大小:"+v_Fqdn_Map.size()); @@ -51,29 +49,12 @@ public class BaseArangoData { LOG.info("读取ArangoDb时间:"+(lastA - startA)); } - public static void main(String[] args) { - new BaseArangoData().readHistoryData("IP", v_Ip_Map); - threadPool.shutdown(); - threadPool.awaitThreadTask(); - ArrayList baseEdgeDocuments = new ArrayList<>(); - Enumeration keys = v_Ip_Map.keys(); - while (keys.hasMoreElements()){ - String key = keys.nextElement(); - BaseEdgeDocument baseEdgeDocument = v_Ip_Map.get(key); - baseEdgeDocument.addAttribute("COMMON_LINK_INFO",""); - baseEdgeDocuments.add(baseEdgeDocument); - } - arangoDBConnect.overwrite(baseEdgeDocuments,"IP"); - arangoDBConnect.clean(); - - } - - private void readHistoryData(String table, ConcurrentHashMap map){ + private void readHistoryData(String collectionName, ConcurrentHashMap map, Class type){ try { - long[] timeRange = getTimeRange(table); + long[] timeRange = getTimeRange(collectionName); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - String sql = getQuerySql(timeRange, i, table); - ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData(arangoDBConnect, sql, map); + String sql = getQuerySql(timeRange, i, collectionName); + ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,collectionName); threadPool.executor(readHistoryArangoData); } }catch (Exception e){ diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index b9e003d..b33b73c 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -12,31 +12,31 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; import static cn.ac.iie.service.read.ReadClickhouseData.*; /** * 读取clickhouse数据,封装到map + * @author wlh */ public class BaseClickhouseData { private static final Logger LOG = LoggerFactory.getLogger(BaseClickhouseData.class); - private static final ClickhouseConnect manger = ClickhouseConnect.getInstance(); + private static ClickhouseConnect manger = ClickhouseConnect.getInstance(); static HashMap>> vFqdnMap = new HashMap<>(); static HashMap>> vIpMap = new HashMap<>(); - static HashMap>> vSubscriberMap = new HashMap<>(); - static HashMap>> eFqdnAddressIpMap = new HashMap<>(); - static HashMap>> eIpVisitFqdnMap = new HashMap<>(); - static HashMap>> eSubsciberLocateIpMap = new HashMap<>(); + static HashMap>> vSubscriberMap = new HashMap<>(); + static HashMap>> eFqdnAddressIpMap = new HashMap<>(); + static HashMap>> eIpVisitFqdnMap = new HashMap<>(); + static HashMap>> eSubsciberLocateIpMap = new HashMap<>(); private DruidPooledConnection connection; private Statement statement; - void BaseVFqdn() { + void baseVertexFqdn() { initializeMap(vFqdnMap); LOG.info("FQDN resultMap初始化完成"); - String sql = getVFqdnSql(); + String sql = getVertexFqdnSql(); long start = System.currentTimeMillis(); try { connection = manger.getConnection(); @@ -45,12 +45,7 @@ public class BaseClickhouseData { while (resultSet.next()) { BaseDocument newDoc = getVertexFqdnDocument(resultSet); if (newDoc != null) { - String fqdnName = newDoc.getKey(); - int i = Math.abs(fqdnName.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap> documentHashMap = vFqdnMap.getOrDefault(i, new HashMap<>()); - ArrayList documentArrayList = documentHashMap.getOrDefault(fqdnName, new ArrayList<>()); - documentArrayList.add(newDoc); - documentHashMap.put(fqdnName,documentArrayList); + putMapByHashcode(newDoc,vFqdnMap); } } long last = System.currentTimeMillis(); @@ -62,10 +57,10 @@ public class BaseClickhouseData { } } - void BaseVIp() { + void baseVertexIp() { initializeMap(vIpMap); LOG.info("IP resultMap初始化完成"); - String sql = getVIpSql(); + String sql = getVertexIpSql(); long start = System.currentTimeMillis(); try { connection = manger.getConnection(); @@ -73,12 +68,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { BaseDocument newDoc = getVertexIpDocument(resultSet); - String ip = newDoc.getKey(); - int i = Math.abs(ip.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap> documentHashMap = vIpMap.getOrDefault(i, new HashMap<>()); - ArrayList documentArrayList = documentHashMap.getOrDefault(ip, new ArrayList<>()); - documentArrayList.add(newDoc); - documentHashMap.put(ip,documentArrayList); + putMapByHashcode(newDoc,vIpMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse v_IP时间:" + (last - start)); @@ -89,7 +79,7 @@ public class BaseClickhouseData { } } - void BaseVertexSubscriber(){ + void baseVertexSubscriber(){ initializeMap(vSubscriberMap); LOG.info("SUBSCRIBER resultMap初始化完成"); String sql = getVertexSubscriberSql(); @@ -100,12 +90,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()){ BaseDocument newDoc = getVertexSubscriberDocument(resultSet); - String key = newDoc.getKey(); - int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap> documentHashMap = vSubscriberMap.getOrDefault(i, new HashMap<>()); - ArrayList documentArrayList = documentHashMap.getOrDefault(key, new ArrayList<>()); - documentArrayList.add(newDoc); - documentHashMap.put(key,documentArrayList); + putMapByHashcode(newDoc,vSubscriberMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse v_SUBSCRIBER时间:" + (last - start)); @@ -117,7 +102,7 @@ public class BaseClickhouseData { } } - void BaseRelationshipSubscriberLocateIp(){ + void baseRelationshipSubscriberLocateIp(){ initializeMap(eSubsciberLocateIpMap); LOG.info("R_LOCATE_SUBSCRIBER2IP"); String sql = getRelationshipSubsciberLocateIpSql(); @@ -128,8 +113,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()){ BaseEdgeDocument newDoc = getRelationshipSubsciberLocateIpDocument(resultSet); - String key = newDoc.getKey(); - putMapByHashcode(newDoc, eSubsciberLocateIpMap,key); + putMapByHashcode(newDoc,eSubsciberLocateIpMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse ESubsciberLocateIp时间:" + (last - start)); @@ -141,10 +125,10 @@ public class BaseClickhouseData { } } - void BaseEFqdnAddressIp() { + void baseRelationshipFqdnAddressIp() { initializeMap(eFqdnAddressIpMap); LOG.info("R_LOCATE_FQDN2IP resultMap初始化完成"); - String sql = getEFqdnAddressIpSql(); + String sql = getRelationshipFqdnAddressIpSql(); long start = System.currentTimeMillis(); try { connection = manger.getConnection(); @@ -153,8 +137,7 @@ public class BaseClickhouseData { while (resultSet.next()) { BaseEdgeDocument newDoc = getRelationFqdnAddressIpDocument(resultSet); - String commonSchemaType = resultSet.getString("common_schema_type"); - putMapByHashcode(newDoc, eFqdnAddressIpMap,commonSchemaType); + putMapByHashcode(newDoc,eFqdnAddressIpMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start)); @@ -165,10 +148,10 @@ public class BaseClickhouseData { } } - void BaseEIpVisitFqdn() { + void baseRelationshipIpVisitFqdn() { initializeMap(eIpVisitFqdnMap); LOG.info("R_VISIT_IP2FQDN resultMap初始化完成"); - String sql = getEIpVisitFqdnSql(); + String sql = getRelationshipIpVisitFqdnSql(); long start = System.currentTimeMillis(); try { connection = manger.getConnection(); @@ -176,8 +159,7 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { BaseEdgeDocument newDoc = getRelationIpVisitFqdnDocument(resultSet); - String commonSchemaType = resultSet.getString("common_schema_type"); - putMapByHashcode(newDoc, eIpVisitFqdnMap,commonSchemaType); + putMapByHashcode(newDoc,eIpVisitFqdnMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); @@ -188,7 +170,7 @@ public class BaseClickhouseData { } } - private void initializeMap(Map map){ + private void initializeMap(HashMap>> map){ try { for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { map.put(i, new HashMap<>()); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index b0bed1a..7eff3e6 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -1,12 +1,8 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.service.relationship.LocateFqdn2Ip; -import cn.ac.iie.service.relationship.LocateSubscriber2Ip; -import cn.ac.iie.service.relationship.VisitIp2Fqdn; -import cn.ac.iie.service.vertex.Fqdn; -import cn.ac.iie.service.vertex.Ip; -import cn.ac.iie.service.vertex.Subscriber; +import cn.ac.iie.service.update.vertex.*; +import cn.ac.iie.service.update.relationship.*; import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.entity.BaseDocument; @@ -26,28 +22,17 @@ public class UpdateGraphData { private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance(); private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + private static BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); private CountDownLatch countDownLatch; public void updateArango(){ long startC = System.currentTimeMillis(); try { - BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); - baseClickhouseData.BaseVFqdn(); updateVertexFqdn(); - - baseClickhouseData.BaseVIp(); updateVertexIp(); - - baseClickhouseData.BaseEFqdnAddressIp(); updateRelationFqdnAddressIp(); - -// baseClickhouseData.BaseEIpVisitFqdn(); // updateRelationIpVisitFqdn(); - - baseClickhouseData.BaseVertexSubscriber(); updateVertexSubscriber(); - - baseClickhouseData.BaseRelationshipSubscriberLocateIp(); updateRelationshipSubsciberLocateIp(); }catch (Exception e){ e.printStackTrace(); @@ -59,13 +44,14 @@ public class UpdateGraphData { } private void updateVertexFqdn(){ + baseClickhouseData.baseVertexFqdn(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { HashMap> stringArrayListHashMap = BaseClickhouseData.vFqdnMap.get(i); LOG.info("vFqdn baseDocumentHashMap大小:" + stringArrayListHashMap.size()); - Fqdn updateVFqdn = new Fqdn(stringArrayListHashMap, arangoManger, "FQDN", BaseArangoData.v_Fqdn_Map,countDownLatch); - updateVFqdn.run(); + Fqdn updateFqdn = new Fqdn(stringArrayListHashMap, arangoManger, "FQDN", BaseArangoData.v_Fqdn_Map,countDownLatch); + updateFqdn.run(); } countDownLatch.await(); LOG.info("---------FQDN vertex 更新完毕---------"); @@ -75,13 +61,14 @@ public class UpdateGraphData { } private void updateVertexSubscriber(){ + baseClickhouseData.baseVertexSubscriber(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { HashMap> stringArrayListHashMap = BaseClickhouseData.vSubscriberMap.get(i); LOG.info("vSubscriber baseDocumentHashMap大小:" + stringArrayListHashMap.size()); - Subscriber updateVSubscriber = new Subscriber(stringArrayListHashMap, arangoManger, "SUBSCRIBER", BaseArangoData.v_Subscriber_Map,countDownLatch); - updateVSubscriber.run(); + Subscriber updateSubscriber = new Subscriber(stringArrayListHashMap, arangoManger, "SUBSCRIBER", BaseArangoData.v_Subscriber_Map,countDownLatch); + updateSubscriber.run(); } countDownLatch.await(); LOG.info("---------SUBSCRIBER vertex 更新完毕---------"); @@ -91,13 +78,14 @@ public class UpdateGraphData { } private void updateRelationshipSubsciberLocateIp(){ + baseClickhouseData.baseRelationshipSubscriberLocateIp(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> baseDocumentHashMap = BaseClickhouseData.eSubsciberLocateIpMap.get(i); + HashMap> baseDocumentHashMap = BaseClickhouseData.eSubsciberLocateIpMap.get(i); LOG.info("ESubsciberLocateIp baseDocumentHashMap大小:" + baseDocumentHashMap.size()); - LocateSubscriber2Ip rLocateSubscriber2IP = new LocateSubscriber2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_SUBSCRIBER2IP", BaseArangoData.e_Subsciber_Locate_Ip_Map, countDownLatch); - rLocateSubscriber2IP.run(); + LocateSubscriber2Ip locateSubscriber2Ip = new LocateSubscriber2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_SUBSCRIBER2IP", BaseArangoData.e_Subsciber_Locate_Ip_Map, countDownLatch); + locateSubscriber2Ip.run(); } countDownLatch.await(); LOG.info("------------R_LOCATE_SUBSCRIBER2IP relationship 更新完毕----------------"); @@ -107,13 +95,14 @@ public class UpdateGraphData { } private void updateVertexIp(){ + baseClickhouseData.baseVertexIp(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { HashMap> stringArrayListHashMap = BaseClickhouseData.vIpMap.get(i); LOG.info("vIp baseDocumentHashMap大小:" + stringArrayListHashMap.size()); - Ip updateVIp = new Ip(stringArrayListHashMap, arangoManger, "IP", BaseArangoData.v_Ip_Map, countDownLatch); - updateVIp.run(); + Ip updateIp = new Ip(stringArrayListHashMap, arangoManger, "IP", BaseArangoData.v_Ip_Map, countDownLatch); + updateIp.run(); } countDownLatch.await(); LOG.info("----------IP vertex 更新完毕-------------"); @@ -123,13 +112,14 @@ public class UpdateGraphData { } private void updateRelationFqdnAddressIp(){ + baseClickhouseData.baseRelationshipFqdnAddressIp(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> baseDocumentHashMap = BaseClickhouseData.eFqdnAddressIpMap.get(i); + HashMap> baseDocumentHashMap = BaseClickhouseData.eFqdnAddressIpMap.get(i); LOG.info("EFqdnAddressIp baseDocumentHashMap大小:" + baseDocumentHashMap.size()); - LocateFqdn2Ip updateEFqdnAddressIp = new LocateFqdn2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_FQDN2IP", BaseArangoData.e_Fqdn_Address_Ip_Map, countDownLatch); - updateEFqdnAddressIp.run(); + LocateFqdn2Ip fqdnAddressIp = new LocateFqdn2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_FQDN2IP", BaseArangoData.e_Fqdn_Address_Ip_Map, countDownLatch); + fqdnAddressIp.run(); } countDownLatch.await(); LOG.info("------------R_LOCATE_FQDN2IP relationship 更新完毕----------------"); @@ -139,13 +129,14 @@ public class UpdateGraphData { } private void updateRelationIpVisitFqdn(){ + baseClickhouseData.baseRelationshipIpVisitFqdn(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> baseDocumentHashMap = BaseClickhouseData.eIpVisitFqdnMap.get(i); + HashMap> baseDocumentHashMap = BaseClickhouseData.eIpVisitFqdnMap.get(i); LOG.info("EIpVisitFqdn baseDocumentHashMap大小:" + baseDocumentHashMap.size()); - VisitIp2Fqdn updateEIpVisitFqdn = new VisitIp2Fqdn(baseDocumentHashMap,arangoManger,"R_VISIT_IP2FQDN",BaseArangoData.e_Ip_Visit_Fqdn_Map,countDownLatch); - updateEIpVisitFqdn.run(); + VisitIp2Fqdn ipVisitFqdn = new VisitIp2Fqdn(baseDocumentHashMap,arangoManger,"R_VISIT_IP2FQDN", BaseArangoData.e_Ip_Visit_Fqdn_Map,countDownLatch); + ipVisitFqdn.run(); } countDownLatch.await(); LOG.info("---------------R_VISIT_IP2FQDN ralationship 更新完毕----------------"); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java index d75742e..2e24e5c 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java @@ -8,9 +8,14 @@ import org.slf4j.LoggerFactory; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.regex.Pattern; +/** + * @author wlh + */ public class ReadClickhouseData { public static long currentHour = System.currentTimeMillis() / (60 * 60 * 1000) * 60 * 60; @@ -18,6 +23,16 @@ public class ReadClickhouseData { private static Pattern pattern = Pattern.compile("^[\\d]*$"); private static final Logger LOG = LoggerFactory.getLogger(ReadClickhouseData.class); + + public static HashSet protocolSet; + + static { + protocolSet = new HashSet<>(); + protocolSet.add("HTTP"); + protocolSet.add("TLS"); + protocolSet.add("DNS"); + } + public static BaseDocument getVertexFqdnDocument(ResultSet resultSet) throws SQLException { String fqdnName = resultSet.getString("FQDN"); BaseDocument newDoc = null; @@ -49,17 +64,18 @@ public class ReadClickhouseData { case "client": newDoc.addAttribute("CLIENT_SESSION_COUNT", sessionCount); newDoc.addAttribute("CLIENT_BYTES_SUM", bytesSum); - newDoc.addAttribute("SERVER_SESSION_COUNT",0L); - newDoc.addAttribute("SERVER_BYTES_SUM",0L); + newDoc.addAttribute("SERVER_SESSION_COUNT", 0L); + newDoc.addAttribute("SERVER_BYTES_SUM", 0L); break; case "server": newDoc.addAttribute("SERVER_SESSION_COUNT", sessionCount); newDoc.addAttribute("SERVER_BYTES_SUM", bytesSum); - newDoc.addAttribute("CLIENT_SESSION_COUNT",0L); - newDoc.addAttribute("CLIENT_BYTES_SUM",0L); + newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L); + newDoc.addAttribute("CLIENT_BYTES_SUM", 0L); break; + default: } - newDoc.addAttribute("COMMON_LINK_INFO",""); + newDoc.addAttribute("COMMON_LINK_INFO", ""); return newDoc; } @@ -102,9 +118,10 @@ public class ReadClickhouseData { long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); long countTotal = resultSet.getLong("COUNT_TOTAL"); + String schemaType = resultSet.getString("schema_type"); String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); long[] clientIpTs = new long[distCipRecents.length]; - for (int i = 0;i < clientIpTs.length;i++){ + for (int i = 0; i < clientIpTs.length; i++) { clientIpTs[i] = currentHour; } @@ -115,10 +132,10 @@ public class ReadClickhouseData { newDoc.setTo("IP/" + vIp); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL", countTotal); newDoc.addAttribute("DIST_CIP", distCipRecents); - newDoc.addAttribute("DIST_CIP_TS",clientIpTs); - + newDoc.addAttribute("DIST_CIP_TS", clientIpTs); + newDoc.addAttribute("PROTOCOL_TYPE", schemaType); + checkSchemaProperty(newDoc, schemaType, countTotal); } return newDoc; } @@ -132,6 +149,7 @@ public class ReadClickhouseData { long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); long countTotal = resultSet.getLong("COUNT_TOTAL"); + String schemaType = resultSet.getString("schema_type"); newDoc = new BaseEdgeDocument(); newDoc.setKey(key); @@ -139,20 +157,20 @@ public class ReadClickhouseData { newDoc.setTo("FQDN/" + vFqdn); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL", countTotal); + newDoc.addAttribute("PROTOCOL_TYPE", schemaType); + checkSchemaProperty(newDoc, schemaType, countTotal); } return newDoc; } - public static void putMapByHashcode(BaseEdgeDocument newDoc, HashMap>> map, String schema) throws SQLException { + public static void putMapByHashcode(T newDoc, HashMap>> map) { if (newDoc != null) { String key = newDoc.getKey(); int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap> documentHashMap = map.getOrDefault(i, new HashMap()); - - HashMap schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>()); - schemaHashMap.put(schema, newDoc); - documentHashMap.put(key, schemaHashMap); + HashMap> documentHashMap = map.getOrDefault(i, new HashMap<>()); + ArrayList documentArrayList = documentHashMap.getOrDefault(key, new ArrayList<>()); + documentArrayList.add(newDoc); + documentHashMap.put(key, documentArrayList); } } @@ -179,7 +197,23 @@ public class ReadClickhouseData { return false; } - public static String getVFqdnSql() { + private static void checkSchemaProperty(BaseEdgeDocument newDoc, String schema, long countTotal) { + long[] recentCnt = new long[24]; + recentCnt[0] = countTotal; + for (String protocol:protocolSet){ + String protocolRecent = protocol +"_CNT_RECENT"; + String protocolTotal = protocol + "_CNT_TOTAL"; + if (protocol.equals(schema)){ + newDoc.addAttribute(protocolTotal, countTotal); + newDoc.addAttribute(protocolRecent, recentCnt); + }else { + newDoc.addAttribute(protocolTotal, 0L); + newDoc.addAttribute(protocolRecent, new long[24]); + } + } + } + + public static String getVertexFqdnSql() { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; @@ -189,7 +223,7 @@ public class ReadClickhouseData { return "SELECT FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME FROM ((" + sslSql + ") UNION ALL (" + httpSql + ")) GROUP BY FQDN HAVING FQDN != ''"; } - public static String getVIpSql() { + public static String getVertexIpSql() { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; @@ -199,23 +233,23 @@ public class ReadClickhouseData { return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))"; } - public static String getEFqdnAddressIpSql() { + public static String getRelationshipFqdnAddressIpSql() { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'SSL' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; - String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; + String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(10000)(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; + String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(10000)(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; } - public static String getEIpVisitFqdnSql() { + public static String getRelationshipIpVisitFqdnSql() { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip"; - String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'SSL' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip"; + String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip"; + String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip"; return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index 623b66f..1098b24 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -2,10 +2,11 @@ package cn.ac.iie.service.read; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseEdgeDocument; +import com.arangodb.entity.BaseDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -13,33 +14,61 @@ import java.util.concurrent.ConcurrentHashMap; * @author wlh * 多线程全量读取arangoDb历史数据,封装到map */ -public class ReadHistoryArangoData extends Thread { +public class ReadHistoryArangoData extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class); - private ArangoDBConnect arangoDBConnect; + private ArangoDBConnect arangoConnect; private String query; - private ConcurrentHashMap map; + private ConcurrentHashMap map; + private Class type; + private String collectionName; - public ReadHistoryArangoData(ArangoDBConnect arangoDBConnect, String query, ConcurrentHashMap map) { - this.arangoDBConnect = arangoDBConnect; + public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, ConcurrentHashMap map, Class type, String collectionName) { + this.arangoConnect = arangoConnect; this.query = query; this.map = map; + this.type = type; + this.collectionName = collectionName; } @Override public void run() { long s = System.currentTimeMillis(); - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); - if (docs != null){ - List baseDocuments = docs.asListRemaining(); + ArangoCursor docs = arangoConnect.executorQuery(query, type); + if (docs != null) { + List baseDocuments = docs.asListRemaining(); int i = 0; - for (BaseEdgeDocument doc : baseDocuments) { + for (T doc : baseDocuments) { String key = doc.getKey(); + switch (collectionName) { + case "R_LOCATE_FQDN2IP": + updateProtocolDocument(doc); + break; + case "R_VISIT_IP2FQDN": + updateProtocolDocument(doc); + break; + default: + } map.put(key, doc); i++; } long l = System.currentTimeMillis(); - LOG.info(query+ "\n处理数据" + i + "条,运行时间:" + (l - s)); + LOG.info(query + "\n处理数据" + i + "条,运行时间:" + (l - s)); } } + + private void updateProtocolDocument(T doc) { + if (doc.getProperties().containsKey("PROTOCOL_TYPE")) { + for (String protocol : ReadClickhouseData.protocolSet) { + String protocolRecent = protocol + "_CNT_RECENT"; + ArrayList cntRecent = (ArrayList) doc.getAttribute(protocolRecent); + Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); + Long[] cntRecentsDst = new Long[24]; + System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); + cntRecentsDst[0] = 0L; + doc.addAttribute(protocolRecent, cntRecentsDst); + } + } + } + } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java deleted file mode 100644 index 6f8bff2..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java +++ /dev/null @@ -1,38 +0,0 @@ -package cn.ac.iie.service.relationship; - -import cn.ac.iie.service.update.Relationship; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseEdgeDocument; - -import java.util.HashMap; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -public class LocateSubscriber2Ip extends Relationship { - - public LocateSubscriber2Ip(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, - String collectionName, - ConcurrentHashMap historyDocumentMap, - CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); - } - - @Override - protected BaseEdgeDocument mergeRelationship(HashMap newEdgeDocumentSchemaMap) { - BaseEdgeDocument doc = null; - if (newEdgeDocumentSchemaMap.size() == 1){ - Set strings = newEdgeDocumentSchemaMap.keySet(); - for (String key:strings){ - doc = newEdgeDocumentSchemaMap.getOrDefault(key,null); - } - } - return doc; - } - - @Override - protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { - super.updateFoundTime(newEdgeDocument,historyEdgeDocument); - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java deleted file mode 100644 index afbb7fe..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java +++ /dev/null @@ -1,19 +0,0 @@ -package cn.ac.iie.service.relationship; - -import cn.ac.iie.service.update.Relationship; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseEdgeDocument; - -import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -public class VisitIp2Fqdn extends Relationship { - public VisitIp2Fqdn(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, - String collectionName, - ConcurrentHashMap historyDocumentMap, - CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java new file mode 100644 index 0000000..1c9203d --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java @@ -0,0 +1,118 @@ +package cn.ac.iie.service.update; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class Document extends Thread{ + private static final Logger LOG = LoggerFactory.getLogger(Document.class); + private HashMap> newDocumentMap; + private ArangoDBConnect arangoManger; + private String collectionName; + private ConcurrentHashMap historyDocumentMap; + private CountDownLatch countDownLatch; + private Class type; + + Document(HashMap> newDocumentMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch, + Class type) { + this.newDocumentMap = newDocumentMap; + this.arangoManger = arangoManger; + this.collectionName = collectionName; + this.historyDocumentMap = historyDocumentMap; + this.countDownLatch = countDownLatch; + this.type = type; + } + + + @Override + public void run() { + Set keySet = newDocumentMap.keySet(); + ArrayList resultDocumentList = new ArrayList<>(); + int i = 0; + try { + for (String key : keySet) { + ArrayList newDocumentSchemaList = newDocumentMap.getOrDefault(key, null); + if (newDocumentSchemaList != null) { + T newDocument = mergeDocument(newDocumentSchemaList); + i += 1; + T historyDocument = historyDocumentMap.getOrDefault(key, null); + updateDocument(newDocument,historyDocument,resultDocumentList); + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { + arangoManger.overwrite(resultDocumentList, collectionName); + LOG.info("更新"+collectionName+":" + i); + i = 0; + } + } + } + if (i != 0) { + arangoManger.overwrite(resultDocumentList, collectionName); + LOG.info("更新"+collectionName+":" + i); + } + } catch (Exception e) { + e.printStackTrace(); + LOG.error(e.toString()); + }finally { + countDownLatch.countDown(); + } + } + + private void updateDocument(T newDocument, T historyDocument, ArrayList resultDocumentList) { + if (historyDocument != null){ + updateFunction(newDocument,historyDocument); + resultDocumentList.add(historyDocument); + }else { + resultDocumentList.add(newDocument); + } + } + + protected void updateFunction(T newDocument, T historyDocument) { + Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); + historyDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); + } + + private T mergeDocument(ArrayList newDocumentSchemaList) throws IllegalAccessException, InstantiationException { + if (newDocumentSchemaList == null || newDocumentSchemaList.isEmpty()){ + return null; + }else if (newDocumentSchemaList.size() == 1){ + return newDocumentSchemaList.get(0); + }else { + T newDocument = type.newInstance(); + Map newProperties = newDocument.getProperties(); + for (T doc:newDocumentSchemaList){ + if (newProperties.isEmpty()){ + newDocument = doc; + newProperties = doc.getProperties(); + }else { + mergeFunction(newProperties,doc); + } + } + newDocument.setProperties(newProperties); + return newDocument; + } + } + + protected void mergeFunction(Map newProperties, T lastDoc) { + long firstFoundTime = Long.parseLong(newProperties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); + long docFirstFoundTime = Long.parseLong(lastDoc.getAttribute("FIRST_FOUND_TIME").toString()); + newProperties.put("FIRST_FOUND_TIME",firstFoundTimedocLastFoundTime? lastFoundTime:docLastFoundTime); + } + + +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java index 4482691..29e6ec2 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java @@ -1,173 +1,78 @@ package cn.ac.iie.service.update; -import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.service.read.ReadClickhouseData; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -public class Relationship extends Thread { - private static final Logger LOG = LoggerFactory.getLogger(Relationship.class); +public class Relationship extends Document { - private HashMap> newDocumentHashMap; - private ArangoDBConnect arangoManger; - private String collectionName; - private ConcurrentHashMap historyDocumentMap; - private CountDownLatch countDownLatch; - - public Relationship(HashMap> newDocumentHashMap, + public Relationship(HashMap> newDocumentHashMap, ArangoDBConnect arangoManger, String collectionName, ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - this.newDocumentHashMap = newDocumentHashMap; - this.arangoManger = arangoManger; - this.collectionName = collectionName; - this.historyDocumentMap = historyDocumentMap; - this.countDownLatch = countDownLatch; + super(newDocumentHashMap,arangoManger,collectionName,historyDocumentMap,countDownLatch,BaseEdgeDocument.class); } @Override - public void run() { - Set keySet = newDocumentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - int i = 0; - try { - for (String key : keySet) { - HashMap newEdgeDocumentSchemaMap = newDocumentHashMap.getOrDefault(key, null); - if (newEdgeDocumentSchemaMap != null) { - BaseEdgeDocument newEdgeDocument = mergeRelationship(newEdgeDocumentSchemaMap); - i += 1; - BaseEdgeDocument historyEdgeDocument = historyDocumentMap.getOrDefault(key, null); - updateRelationship(newEdgeDocument,historyEdgeDocument,docInsert); - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { - arangoManger.overwrite(docInsert, collectionName); - LOG.info("更新"+collectionName+":" + i); - i = 0; - } - } - } - if (i != 0) { - arangoManger.overwrite(docInsert, collectionName); - LOG.info("更新"+collectionName+":" + i); - } - } catch (Exception e) { - e.printStackTrace(); - LOG.error(e.toString()); - }finally { - countDownLatch.countDown(); - } + protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument){ + super.updateFunction(newEdgeDocument,historyEdgeDocument); } - protected BaseEdgeDocument mergeRelationship(HashMap newEdgeDocumentSchemaMap) { - BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument(); - Set schemaSets = newEdgeDocumentSchemaMap.keySet(); - Map properties = newBaseEdgeDocument.getProperties(); - - for (String schema : schemaSets) { - BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentSchemaMap.get(schema); - if (!properties.isEmpty()) { - mergeFunction(properties, schemaEdgeDoc); - } else { - newBaseEdgeDocument = schemaEdgeDoc; - properties = schemaEdgeDoc.getProperties(); - } - setSchemaCount(schema, schemaEdgeDoc, properties); - } - properties.remove("COUNT_TOTAL"); - checkSchemaProperty(properties); - - newBaseEdgeDocument.setProperties(properties); - return newBaseEdgeDocument; - } - - private void updateRelationship(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument,ArrayList docInsert){ - if (historyEdgeDocument != null && newEdgeDocument != null) { - updateFunction(newEdgeDocument, historyEdgeDocument); - docInsert.add(historyEdgeDocument); - } else { - docInsert.add(newEdgeDocument); - } - } - - protected void updateFunction(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument){ - updateFoundTime(newEdgeDocument,historyEdgeDocument); - setSchemaCntByHistory(historyEdgeDocument,"TLS_CNT_RECENT","TLS_CNT_TOTAL",newEdgeDocument); - setSchemaCntByHistory(historyEdgeDocument,"HTTP_CNT_RECENT","HTTP_CNT_TOTAL",newEdgeDocument); - setSchemaCntByHistory(historyEdgeDocument,"DNS_CNT_RECENT","DNS_CNT_TOTAL",newEdgeDocument); - } - - protected void updateFoundTime(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument){ - Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); - historyEdgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); - } - - private void setSchemaCntByHistory(BaseEdgeDocument historyEdgeDocument,String schema,String totalSchema,BaseEdgeDocument newEdgeDocument){ + protected void updateProcotol(BaseEdgeDocument historyEdgeDocument, String schema, BaseEdgeDocument newEdgeDocument){ + String recentSchema = schema +"_CNT_RECENT"; + String totalSchema = schema + "_CNT_TOTAL"; long countTotal = Long.parseLong(newEdgeDocument.getAttribute(totalSchema).toString()); - long updateCountTotal = Long.parseLong(historyEdgeDocument.getAttribute(totalSchema).toString()); + if (countTotal > 0L){ + long updateCountTotal = Long.parseLong(historyEdgeDocument.getAttribute(totalSchema).toString()); - ArrayList cntRecent = (ArrayList) historyEdgeDocument.getAttribute(schema); - Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); - Long[] cntRecentsDst = new Long[24]; - System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); - cntRecentsDst[0] = countTotal; + Long[] cntRecent = (Long[]) historyEdgeDocument.getAttribute(recentSchema); + cntRecent[0] = countTotal; - historyEdgeDocument.addAttribute(schema, cntRecentsDst); - historyEdgeDocument.addAttribute(totalSchema, countTotal + updateCountTotal); - } - - protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc) { - mergeFoundTime(properties, schemaEdgeDoc); - } - - private void mergeFoundTime(Map properties, BaseEdgeDocument schemaEdgeDoc) { - long schemaFirstFoundTime = Long.parseLong(schemaEdgeDoc.getAttribute("FIRST_FOUND_TIME").toString()); - long firstFoundTime = Long.parseLong(properties.get("FIRST_FOUND_TIME").toString()); - properties.put("FIRST_FOUND_TIME", schemaFirstFoundTime < firstFoundTime ? schemaFirstFoundTime : firstFoundTime); - long schemaLastFoundTime = Long.parseLong(schemaEdgeDoc.getAttribute("LAST_FOUND_TIME").toString()); - long lastFoundTime = Long.parseLong(properties.get("LAST_FOUND_TIME").toString()); - properties.put("LAST_FOUND_TIME", schemaLastFoundTime > lastFoundTime ? schemaLastFoundTime : lastFoundTime); - } - - private void setSchemaCount(String schema, BaseEdgeDocument schemaEdgeDoc, Map properties) { - switch (schema) { - case "HTTP": - long httpCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); - properties.put("HTTP_CNT_TOTAL", httpCntTotal); - long[] httpCntRecentsDst = new long[24]; - httpCntRecentsDst[0] = httpCntTotal; - properties.put("HTTP_CNT_RECENT", httpCntRecentsDst); - break; - case "SSL": - long tlsCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); - properties.put("TLS_CNT_TOTAL", tlsCntTotal); - long[] tlsCntRecentsDst = new long[24]; - tlsCntRecentsDst[0] = tlsCntTotal; - properties.put("TLS_CNT_RECENT", tlsCntRecentsDst); - break; - default: - break; + historyEdgeDocument.addAttribute(recentSchema, cntRecent); + historyEdgeDocument.addAttribute(totalSchema, countTotal + updateCountTotal); + String hisProtocolType = historyEdgeDocument.getAttribute("PROTOCOL_TYPE").toString(); + if (!hisProtocolType.contains(schema)){ + hisProtocolType = hisProtocolType + "," + schema; + historyEdgeDocument.addAttribute("PROTOCOL_TYPE",hisProtocolType); + } } } - private void checkSchemaProperty(Map properties){ - if (!properties.containsKey("TLS_CNT_TOTAL")){ - properties.put("TLS_CNT_TOTAL",0L); - properties.put("TLS_CNT_RECENT",new long[24]); - } - if (!properties.containsKey("HTTP_CNT_TOTAL")){ - properties.put("HTTP_CNT_TOTAL",0L); - properties.put("HTTP_CNT_RECENT",new long[24]); - } - if (!properties.containsKey("DNS_CNT_TOTAL")){ - properties.put("DNS_CNT_TOTAL",0L); - properties.put("DNS_CNT_RECENT",new long[24]); + @Override + protected void mergeFunction(Map newProperties, BaseEdgeDocument lastDoc) { + super.mergeFunction(newProperties, lastDoc); + } + + protected void mergeProtocol(Map newProperties, BaseEdgeDocument lastDoc) { + String schema = lastDoc.getAttribute("PROTOCOL_TYPE").toString(); + if (ReadClickhouseData.protocolSet.contains(schema)){ + setProtocolProperties(schema,newProperties,lastDoc); } } + private void setProtocolProperties(String protocol,Map newProperties, BaseEdgeDocument lastDoc){ + String protocolRecent = protocol +"_CNT_RECENT"; + String protocolTotal = protocol + "_CNT_TOTAL"; + long httpCntTotal = Long.parseLong(lastDoc.getAttribute(protocolTotal).toString()); + newProperties.put(protocolTotal, httpCntTotal); + long[] httpCntRecents = (long[]) lastDoc.getAttribute(protocolRecent); + newProperties.put(protocolRecent, httpCntRecents); + String protocolType = newProperties.get("PROTOCOL_TYPE").toString(); + newProperties.put("PROTOCOL_TYPE",addProcotolType(protocolType,protocol)); + } + + private String addProcotolType(String protocolType,String schema){ + if (!protocolType.contains(schema)){ + protocolType = protocolType + "," + schema; + } + return protocolType; + } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java index e68f466..322a995 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java @@ -1,16 +1,11 @@ package cn.ac.iie.service.update; -import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -18,109 +13,29 @@ import java.util.concurrent.CountDownLatch; * @author wlh * 多线程更新vertex数据 */ -public class Vertex extends Thread{ - private static final Logger LOG = LoggerFactory.getLogger(Vertex.class); +public class Vertex extends Document { - private HashMap> newDocumentHashMap; - private ArangoDBConnect arangoManger; - private String collectionName; - private ConcurrentHashMap historyDocumentMap; - private CountDownLatch countDownLatch; - - public Vertex(HashMap> newDocumentHashMap, + public Vertex(HashMap> newDocumentHashMap, ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, - CountDownLatch countDownLatch){ - this.newDocumentHashMap = newDocumentHashMap; - this.arangoManger = arangoManger; - this.collectionName = collectionName; - this.historyDocumentMap = historyDocumentMap; - this.countDownLatch = countDownLatch; + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch,BaseDocument.class); + } + + @Override + protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) { + super.updateFunction(newDocument, historyDocument); + } + + @Override + protected void mergeFunction(Map properties, BaseDocument doc) { + super.mergeFunction(properties, doc); } @Override public void run() { - Set keySet = newDocumentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - int i = 0; - try { - for (String key:keySet){ - ArrayList documentArrayList = newDocumentHashMap.getOrDefault(key, null); - BaseDocument newDocument = mergeVertex(documentArrayList); - if (newDocument != null){ - i += 1; - BaseDocument historyDocument = historyDocumentMap.getOrDefault(key, null); - updateVertex(newDocument,historyDocument,docInsert); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ - arangoManger.overwrite(docInsert,collectionName); - LOG.info("更新"+collectionName+":"+i); - i = 0; - } - } - if (i != 0){ - arangoManger.overwrite(docInsert,collectionName); - LOG.info("更新"+collectionName+":"+i); - } - }catch (Exception e){ - e.printStackTrace(); - LOG.error(e.toString()); - }finally { - countDownLatch.countDown(); - } + super.run(); } - private void updateVertex(BaseDocument newDocument,BaseDocument historyDocument,ArrayList docInsert){ - if (historyDocument != null){ - updateFunction(newDocument,historyDocument); - docInsert.add(historyDocument); - }else { - docInsert.add(newDocument); - } - } - - protected void updateFunction(BaseDocument newDocument,BaseDocument historyDocument){ - updateFoundTime(newDocument,historyDocument); - } - - private void updateFoundTime(BaseDocument newDocument,BaseDocument historyDocument){ - Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); - historyDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); - } - - private BaseDocument mergeVertex(ArrayList documentArrayList){ - if (documentArrayList == null || documentArrayList.isEmpty()){ - return null; - }else if (documentArrayList.size() == 1){ - return documentArrayList.get(0); - }else { - BaseDocument document = new BaseDocument(); - Map properties = document.getProperties(); - for (BaseDocument doc:documentArrayList){ - if (properties.isEmpty()){ - document = doc; - properties = doc.getProperties(); - }else { - mergeFunction(properties,doc); - } - } - document.setProperties(properties); - return document; - } - } - - protected void mergeFunction(Map properties,BaseDocument doc){ - mergeFoundTime(properties,doc); - } - - private void mergeFoundTime(Map properties,BaseDocument doc){ - long firstFoundTime = Long.parseLong(properties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); - long docFirstFoundTime = Long.parseLong(doc.getAttribute("FIRST_FOUND_TIME").toString()); - properties.put("FIRST_FOUND_TIME",firstFoundTimedocLastFoundTime? lastFoundTime:docLastFoundTime); - } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java similarity index 66% rename from IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java index f3551b3..9f95b51 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java @@ -1,4 +1,4 @@ -package cn.ac.iie.service.relationship; +package cn.ac.iie.service.update.relationship; import cn.ac.iie.service.read.ReadClickhouseData; import cn.ac.iie.service.update.Relationship; @@ -11,7 +11,7 @@ import java.util.concurrent.CountDownLatch; public class LocateFqdn2Ip extends Relationship { - public LocateFqdn2Ip(HashMap> newDocumentHashMap, + public LocateFqdn2Ip(HashMap> newDocumentHashMap, ArangoDBConnect arangoManger, String collectionName, ConcurrentHashMap historyDocumentMap, @@ -22,25 +22,16 @@ public class LocateFqdn2Ip extends Relationship { @Override protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc){ super.mergeFunction(properties,schemaEdgeDoc); - mergeDistinctClientIp(properties,schemaEdgeDoc); + mergeProtocol(properties, schemaEdgeDoc); } @Override protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { super.updateFunction(newEdgeDocument, historyEdgeDocument); - updateDistinctClientIp(newEdgeDocument, historyEdgeDocument); - } - - private void mergeDistinctClientIp(Map properties, BaseEdgeDocument schemaEdgeDoc){ - String[] schemaDistCipRecents = (String[]) schemaEdgeDoc.getAttribute("DIST_CIP"); - String[] distCipRecents = (String[]) properties.get("DIST_CIP"); - Object[] mergeClientIp = distinctIp(schemaDistCipRecents, distCipRecents); - long[] mergeClientIpTs = new long[mergeClientIp.length]; - for (int i = 0;i < mergeClientIpTs.length;i++){ - mergeClientIpTs[i] = ReadClickhouseData.currentHour; + for (String schema:ReadClickhouseData.protocolSet){ + updateProcotol(historyEdgeDocument,schema,newEdgeDocument); } - properties.put("DIST_CIP", mergeClientIp); - properties.put("DIST_CIP_TS",mergeClientIpTs); + updateDistinctClientIp(newEdgeDocument, historyEdgeDocument); } private void updateDistinctClientIp(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ @@ -54,7 +45,7 @@ public class LocateFqdn2Ip extends Relationship { } Object[] distCipRecent = (Object[])newEdgeDocument.getAttribute("DIST_CIP"); for (Object cip:distCipRecent){ - distCipToTs.put(cip.toString(),ReadClickhouseData.currentHour); + distCipToTs.put(cip.toString(), ReadClickhouseData.currentHour); } Map sortDistCip = sortMapByValue(distCipToTs); @@ -74,8 +65,8 @@ public class LocateFqdn2Ip extends Relationship { List> entryList = new ArrayList<>(oriMap.entrySet()); entryList.sort((o1, o2) -> o2.getValue().compareTo(o1.getValue())); - if(entryList.size() > 100){ - for(Map.Entry set:entryList.subList(0, 100)){ + if(entryList.size() > 10000){ + for(Map.Entry set:entryList.subList(0, 10000)){ sortedMap.put(set.getKey(), set.getValue()); } }else { @@ -86,15 +77,4 @@ public class LocateFqdn2Ip extends Relationship { return sortedMap; } - private Object[] distinctIp(Object[] distCipTotalsSrc,Object[] distCipRecentsSrc){ - HashSet dIpSet = new HashSet<>(); - dIpSet.addAll(Arrays.asList(distCipRecentsSrc)); - dIpSet.addAll(Arrays.asList(distCipTotalsSrc)); - Object[] distCipTotals = dIpSet.toArray(); - if (distCipTotals.length > 100) { - System.arraycopy(distCipTotals, 0, distCipTotals, 0, 100); - } - return distCipTotals; - } - } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateSubscriber2Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateSubscriber2Ip.java new file mode 100644 index 0000000..5ca4cb0 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateSubscriber2Ip.java @@ -0,0 +1,21 @@ +package cn.ac.iie.service.update.relationship; + +import cn.ac.iie.service.update.Relationship; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class LocateSubscriber2Ip extends Relationship { + + public LocateSubscriber2Ip(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java new file mode 100644 index 0000000..f283d84 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java @@ -0,0 +1,36 @@ +package cn.ac.iie.service.update.relationship; + +import cn.ac.iie.service.read.ReadClickhouseData; +import cn.ac.iie.service.update.Relationship; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class VisitIp2Fqdn extends Relationship { + public VisitIp2Fqdn(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); + } + + @Override + protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { + super.updateFunction(newEdgeDocument, historyEdgeDocument); + for (String schema: ReadClickhouseData.protocolSet){ + updateProcotol(historyEdgeDocument,schema,newEdgeDocument); + } + } + + @Override + protected void mergeFunction(Map newProperties, BaseEdgeDocument lastDoc) { + super.mergeFunction(newProperties, lastDoc); + mergeProtocol(newProperties, lastDoc); + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Fqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Fqdn.java similarity index 79% rename from IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Fqdn.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Fqdn.java index 4251910..c13ca8c 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Fqdn.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Fqdn.java @@ -1,9 +1,8 @@ -package cn.ac.iie.service.vertex; +package cn.ac.iie.service.update.vertex; import cn.ac.iie.service.update.Vertex; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; @@ -15,7 +14,7 @@ public class Fqdn extends Vertex { public Fqdn(HashMap> newDocumentHashMap, ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, + ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java similarity index 89% rename from IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java index daa53f7..4cdedbd 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java @@ -1,9 +1,8 @@ -package cn.ac.iie.service.vertex; +package cn.ac.iie.service.update.vertex; import cn.ac.iie.service.update.Vertex; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; @@ -16,11 +15,17 @@ public class Ip extends Vertex { public Ip(HashMap> newDocumentHashMap, ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, + ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); } + @Override + protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) { + super.updateFunction(newDocument, historyDocument); + updateIpByType(newDocument, historyDocument); + } + @Override protected void mergeFunction(Map properties, BaseDocument doc) { super.mergeFunction(properties, doc); @@ -40,8 +45,8 @@ public class Ip extends Vertex { if (!properties.containsKey(property)){ properties.put(property,0L); checkIpTypeProperty(properties,mergeProperties,property); - }else if (properties.get(property).toString().equals("0") && mergeProperties.containsKey(property)){ - if (!mergeProperties.get(property).toString().equals("0")){ + }else if ("0".equals(properties.get(property).toString()) && mergeProperties.containsKey(property)){ + if (!"0".equals(mergeProperties.get(property).toString())){ properties.put(property,Long.parseLong(mergeProperties.get(property).toString())); } } @@ -50,12 +55,6 @@ public class Ip extends Vertex { } } - @Override - protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) { - super.updateFunction(newDocument, historyDocument); - updateIpByType(newDocument, historyDocument); - } - private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument){ addProperty(newDocument,historyDocument,"CLIENT_SESSION_COUNT"); addProperty(newDocument,historyDocument,"CLIENT_BYTES_SUM"); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Subscriber.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Subscriber.java similarity index 79% rename from IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Subscriber.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Subscriber.java index 597c6c6..02f1468 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Subscriber.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Subscriber.java @@ -1,9 +1,8 @@ -package cn.ac.iie.service.vertex; +package cn.ac.iie.service.update.vertex; import cn.ac.iie.service.update.Vertex; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; @@ -15,7 +14,7 @@ public class Subscriber extends Vertex { public Subscriber(HashMap> newDocumentHashMap, ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, + ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index 60e3b26..e81e7a8 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -108,7 +108,6 @@ public class ArangoDBConnect { } }catch (Exception e){ LOG.error("更新失败:"+e.toString()); -// clean(); }finally { docOverwrite.clear(); } diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index 58749b2..1d2be99 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -1,5 +1,5 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.127 +arangoDB.host=192.168.40.182 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111 @@ -13,6 +13,5 @@ update.arango.batch=10000 thread.pool.number=10 thread.await.termination.time=10 -read.clickhouse.max.time=1594809098 -#read.clickhouse.min.time=1594622638 -read.clickhouse.min.time=1593792000 \ No newline at end of file +read.clickhouse.max.time=1594981808 +read.clickhouse.min.time=1593878400 \ No newline at end of file