diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index 68ee89e..92ac31c 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -1,7 +1,7 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.etl.read.ReadHistoryArangoData; +import cn.ac.iie.service.read.ReadHistoryArangoData; import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.ArangoCursor; @@ -15,10 +15,12 @@ import java.util.concurrent.ConcurrentHashMap; public class BaseArangoData { private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); - public static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); - public static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); - public static ConcurrentHashMap e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>(); - public static ConcurrentHashMap e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>(); + static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); + static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); + static ConcurrentHashMap v_Subscriber_Map = new ConcurrentHashMap<>(); + static ConcurrentHashMap e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>(); + static ConcurrentHashMap e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>(); + static ConcurrentHashMap e_Subsciber_Locate_Ip_Map = new ConcurrentHashMap<>(); private static final ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); @@ -28,14 +30,18 @@ public class BaseArangoData { long startA = System.currentTimeMillis(); readHistoryData("FQDN", v_Fqdn_Map); readHistoryData("IP", v_Ip_Map); + readHistoryData("SUBSCRIBER",v_Subscriber_Map); readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map); readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map); + readHistoryData("R_LOCATE_SUBSCRIBER2IP",e_Subsciber_Locate_Ip_Map); threadPool.shutdown(); threadPool.awaitThreadTask(); - LOG.info("v_Fqdn_Map大小:"+BaseArangoData.v_Fqdn_Map.size()); - LOG.info("v_Ip_Map大小:"+BaseArangoData.v_Ip_Map.size()); - LOG.info("e_Fqdn_Address_Ip_Map大小:"+BaseArangoData.e_Fqdn_Address_Ip_Map.size()); - LOG.info("e_Ip_Visit_Fqdn_Map大小:"+BaseArangoData.e_Ip_Visit_Fqdn_Map.size()); + LOG.info("v_Fqdn_Map大小:"+v_Fqdn_Map.size()); + LOG.info("v_Ip_Map大小:"+v_Ip_Map.size()); + LOG.info("v_Subscriber_Map大小:"+v_Subscriber_Map.size()); + LOG.info("e_Fqdn_Address_Ip_Map大小:"+e_Fqdn_Address_Ip_Map.size()); + LOG.info("e_Ip_Visit_Fqdn_Map大小:"+e_Ip_Visit_Fqdn_Map.size()); + LOG.info("e_Subsciber_Locate_Ip_Map大小:"+e_Subsciber_Locate_Ip_Map.size()); long lastA = System.currentTimeMillis(); LOG.info("读取ArangoDb时间:"+(lastA - startA)); } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index a08679d..02bb2d8 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -14,23 +14,28 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import static cn.ac.iie.etl.read.ReadClickhouseData.*; +import static cn.ac.iie.service.read.ReadClickhouseData.*; +/** + * 读取clickhouse数据,封装到map + */ public class BaseClickhouseData { private static final Logger LOG = LoggerFactory.getLogger(BaseClickhouseData.class); private static final ClickhouseConnect manger = ClickhouseConnect.getInstance(); static HashMap>> vFqdnMap = new HashMap<>(); static HashMap>> vIpMap = new HashMap<>(); + static HashMap>> vSubscriberMap = new HashMap<>(); static HashMap>> eFqdnAddressIpMap = new HashMap<>(); static HashMap>> eIpVisitFqdnMap = new HashMap<>(); + static HashMap>> eSubsciberLocateIpMap = new HashMap<>(); private DruidPooledConnection connection; private Statement statement; - public void BaseVFqdn() { + void BaseVFqdn() { initializeVertexMap(vFqdnMap); - LOG.info("V_FQDN resultMap初始化完成"); + LOG.info("FQDN resultMap初始化完成"); String sql = getVFqdnSql(); long start = System.currentTimeMillis(); try { @@ -57,9 +62,9 @@ public class BaseClickhouseData { } } - public void BaseVIp() { + void BaseVIp() { initializeVertexMap(vIpMap); - LOG.info("V_IP resultMap初始化完成"); + LOG.info("IP resultMap初始化完成"); String sql = getVIpSql(); long start = System.currentTimeMillis(); try { @@ -84,9 +89,61 @@ public class BaseClickhouseData { } } - public void BaseEFqdnAddressIp() { + void BaseVertexSubscriber(){ + initializeVertexMap(vSubscriberMap); + LOG.info("SUBSCRIBER resultMap初始化完成"); + String sql = getVertexSubscriberSql(); + long start = System.currentTimeMillis(); + try { + connection = manger.getConnection(); + statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()){ + BaseDocument newDoc = getVertexSubscriberDocument(resultSet); + String key = newDoc.getKey(); + int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; + HashMap> documentHashMap = vSubscriberMap.getOrDefault(i, new HashMap<>()); + ArrayList documentArrayList = documentHashMap.getOrDefault(key, new ArrayList<>()); + documentArrayList.add(newDoc); + documentHashMap.put(key,documentArrayList); + } + long last = System.currentTimeMillis(); + LOG.info(sql + "\n读取clickhouse v_SUBSCRIBER时间:" + (last - start)); + }catch (Exception e){ + LOG.error(sql + "\n读取clickhouse v_SUBSCRIBER失败"); + e.printStackTrace(); + }finally { + manger.clear(statement,connection); + } + } + + void BaseRelationshipSubscriberLocateIp(){ + initializeVertexMap(eSubsciberLocateIpMap); + LOG.info("R_LOCATE_SUBSCRIBER2IP"); + String sql = getRelationshipSubsciberLocateIpSql(); + long start = System.currentTimeMillis(); + try { + connection = manger.getConnection(); + statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()){ + BaseEdgeDocument newDoc = getRelationshipSubsciberLocateIpDocument(resultSet); + String key = newDoc.getKey(); + putMapByHashcode(resultSet, newDoc, eSubsciberLocateIpMap,key); + } + long last = System.currentTimeMillis(); + LOG.info(sql + "\n读取clickhouse ESubsciberLocateIp时间:" + (last - start)); + }catch (Exception e){ + LOG.error(sql + "\n读取clickhouse ESubsciberLocateIp失败"); + e.printStackTrace(); + }finally { + manger.clear(statement,connection); + } + } + + void BaseEFqdnAddressIp() { initializeVertexMap(eFqdnAddressIpMap); - LOG.info("E_ADDRESS_V_FQDN_TO_V_IP resultMap初始化完成"); + LOG.info("R_LOCATE_FQDN2IP resultMap初始化完成"); String sql = getEFqdnAddressIpSql(); long start = System.currentTimeMillis(); try { @@ -96,7 +153,8 @@ public class BaseClickhouseData { while (resultSet.next()) { BaseEdgeDocument newDoc = getRelationFqdnAddressIpDocument(resultSet); - putMapByHashcode(resultSet, newDoc, eFqdnAddressIpMap); + String commonSchemaType = resultSet.getString("common_schema_type"); + putMapByHashcode(resultSet, newDoc, eFqdnAddressIpMap,commonSchemaType); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start)); @@ -107,9 +165,9 @@ public class BaseClickhouseData { } } - public void BaseEIpVisitFqdn() { + void BaseEIpVisitFqdn() { initializeVertexMap(eIpVisitFqdnMap); - LOG.info("E_VISIT_V_IP_TO_V_FQDN resultMap初始化完成"); + LOG.info("R_VISIT_IP2FQDN resultMap初始化完成"); String sql = getEIpVisitFqdnSql(); long start = System.currentTimeMillis(); try { @@ -118,7 +176,8 @@ public class BaseClickhouseData { ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { BaseEdgeDocument newDoc = getRelationIpVisitFqdnDocument(resultSet); - putMapByHashcode(resultSet, newDoc, eIpVisitFqdnMap); + String commonSchemaType = resultSet.getString("common_schema_type"); + putMapByHashcode(resultSet, newDoc, eIpVisitFqdnMap,commonSchemaType); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index c38569e..4cb0770 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -1,10 +1,12 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.etl.relationship.LocateFqdn2Ip; -import cn.ac.iie.etl.relationship.VisitIp2Fqdn; -import cn.ac.iie.etl.vertex.Fqdn; -import cn.ac.iie.etl.vertex.Ip; +import cn.ac.iie.service.relationship.LocateFqdn2Ip; +import cn.ac.iie.service.relationship.LocateSubscriber2Ip; +import cn.ac.iie.service.relationship.VisitIp2Fqdn; +import cn.ac.iie.service.vertex.Fqdn; +import cn.ac.iie.service.vertex.Ip; +import cn.ac.iie.service.vertex.Subscriber; import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.entity.BaseDocument; @@ -16,6 +18,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.CountDownLatch; +/** + * 更新图数据库业务类 + */ public class UpdateGraphData { private static final Logger LOG = LoggerFactory.getLogger(UpdateGraphData.class); private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance(); @@ -38,10 +43,16 @@ public class UpdateGraphData { baseClickhouseData.BaseEIpVisitFqdn(); updateRelationIpVisitFqdn(); + + baseClickhouseData.BaseVertexSubscriber(); + updateVertexSubscriber(); + + baseClickhouseData.BaseRelationshipSubscriberLocateIp(); + updateRelationshipSubsciberLocateIp(); }catch (Exception e){ e.printStackTrace(); }finally { - ArangoDBConnect.clean(); + arangoManger.clean(); } long lastC = System.currentTimeMillis(); LOG.info("更新ArangoDb时间:"+(lastC - startC)); @@ -63,6 +74,38 @@ public class UpdateGraphData { } } + private void updateVertexSubscriber(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> stringArrayListHashMap = BaseClickhouseData.vSubscriberMap.get(i); + LOG.info("vSubscriber baseDocumentHashMap大小:" + stringArrayListHashMap.size()); + Subscriber updateVSubscriber = new Subscriber(stringArrayListHashMap, arangoManger, "SUBSCRIBER", BaseArangoData.v_Subscriber_Map,countDownLatch); + updateVSubscriber.run(); + } + countDownLatch.await(); + LOG.info("---------SUBSCRIBER vertex 更新完毕---------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + + private void updateRelationshipSubsciberLocateIp(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> baseDocumentHashMap = BaseClickhouseData.eSubsciberLocateIpMap.get(i); + LOG.info("ESubsciberLocateIp baseDocumentHashMap大小:" + baseDocumentHashMap.size()); + LocateSubscriber2Ip rLocateSubscriber2IP = new LocateSubscriber2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_SUBSCRIBER2IP", BaseArangoData.e_Subsciber_Locate_Ip_Map, countDownLatch); + rLocateSubscriber2IP.run(); + } + countDownLatch.await(); + LOG.info("------------R_LOCATE_SUBSCRIBER2IP relationship 更新完毕----------------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + private void updateVertexIp(){ try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java similarity index 76% rename from IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadClickhouseData.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java index 550a2aa..f9f96e7 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java @@ -1,4 +1,4 @@ -package cn.ac.iie.etl.read; +package cn.ac.iie.service.read; import cn.ac.iie.config.ApplicationConfig; import com.arangodb.entity.BaseDocument; @@ -56,6 +56,37 @@ public class ReadClickhouseData { return newDoc; } + public static BaseDocument getVertexSubscriberDocument(ResultSet resultSet) throws SQLException { + String subscriberId = resultSet.getString("common_subscriber_id"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + BaseDocument newDoc = new BaseDocument(); + newDoc.setKey(subscriberId); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + return newDoc; + } + + public static BaseEdgeDocument getRelationshipSubsciberLocateIpDocument(ResultSet resultSet) throws SQLException { + String subscriberId = resultSet.getString("common_subscriber_id"); + String framedIp = resultSet.getString("radius_framed_ip"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + + String key = subscriberId + "-" + framedIp; + BaseEdgeDocument newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("SUBSCRIBER/" + subscriberId); + newDoc.setTo("IP/" + framedIp); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL", countTotal); + + return newDoc; + + } + public static BaseEdgeDocument getRelationFqdnAddressIpDocument(ResultSet resultSet) throws SQLException { String vFqdn = resultSet.getString("FQDN"); BaseEdgeDocument newDoc = null; @@ -102,15 +133,14 @@ public class ReadClickhouseData { return newDoc; } - public static void putMapByHashcode(ResultSet resultSet, BaseEdgeDocument newDoc, HashMap>> map) throws SQLException { + public static void putMapByHashcode(ResultSet resultSet, BaseEdgeDocument newDoc, HashMap>> map,String schema) throws SQLException { if (newDoc != null){ String key = newDoc.getKey(); - String commonSchemaType = resultSet.getString("common_schema_type"); int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; HashMap> documentHashMap = map.getOrDefault(i, new HashMap()); HashMap schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>()); - schemaHashMap.put(commonSchemaType, newDoc); + schemaHashMap.put(schema, newDoc); documentHashMap.put(key, schemaHashMap); } } @@ -178,11 +208,27 @@ public class ReadClickhouseData { return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; } + public static String getVertexSubscriberSql(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " common_recv_time >= "+minTime+" AND common_recv_time <= "+maxTime+" AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; + return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE"+where+" GROUP BY common_subscriber_id"; + } + + public static String getRelationshipSubsciberLocateIpSql(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " common_recv_time >= "+minTime+" AND common_recv_time <= "+maxTime+" AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; + return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE"+where+" GROUP BY common_subscriber_id,radius_framed_ip"; + } + private static long[] getTimeLimit() { -// long maxTime = System.currentTimeMillis() / 1000; -// long minTime = maxTime - 3600; - long maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; - long minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; + long maxTime = System.currentTimeMillis() / 1000; + long minTime = maxTime - 3600; +// long maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; +// long minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; return new long[]{maxTime, minTime}; } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java similarity index 97% rename from IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index c4ca68b..1c62ced 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -1,4 +1,4 @@ -package cn.ac.iie.etl.read; +package cn.ac.iie.service.read; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.ArangoCursor; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java similarity index 93% rename from IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java index cd92003..8bd8764 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java @@ -1,6 +1,6 @@ -package cn.ac.iie.etl.relationship; +package cn.ac.iie.service.relationship; -import cn.ac.iie.etl.update.Relationship; +import cn.ac.iie.service.update.Relationship; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java new file mode 100644 index 0000000..6f8bff2 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java @@ -0,0 +1,38 @@ +package cn.ac.iie.service.relationship; + +import cn.ac.iie.service.update.Relationship; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.HashMap; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class LocateSubscriber2Ip extends Relationship { + + public LocateSubscriber2Ip(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); + } + + @Override + protected BaseEdgeDocument mergeRelationship(HashMap newEdgeDocumentSchemaMap) { + BaseEdgeDocument doc = null; + if (newEdgeDocumentSchemaMap.size() == 1){ + Set strings = newEdgeDocumentSchemaMap.keySet(); + for (String key:strings){ + doc = newEdgeDocumentSchemaMap.getOrDefault(key,null); + } + } + return doc; + } + + @Override + protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { + super.updateFoundTime(newEdgeDocument,historyEdgeDocument); + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java similarity index 89% rename from IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java index d2b66c3..afbb7fe 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java @@ -1,6 +1,6 @@ -package cn.ac.iie.etl.relationship; +package cn.ac.iie.service.relationship; -import cn.ac.iie.etl.update.Relationship; +import cn.ac.iie.service.update.Relationship; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Relationship.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java similarity index 97% rename from IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Relationship.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java index 9b5a3bb..6910325 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Relationship.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java @@ -1,4 +1,4 @@ -package cn.ac.iie.etl.update; +package cn.ac.iie.service.update; import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.utils.ArangoDBConnect; @@ -60,7 +60,7 @@ public class Relationship extends Thread { } } - private BaseEdgeDocument mergeRelationship(HashMap newEdgeDocumentSchemaMap) { + protected BaseEdgeDocument mergeRelationship(HashMap newEdgeDocumentSchemaMap) { BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument(); Set schemaSets = newEdgeDocumentSchemaMap.keySet(); Map properties = newBaseEdgeDocument.getProperties(); @@ -83,7 +83,7 @@ public class Relationship extends Thread { } private void updateRelationship(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument,ArrayList docInsert){ - if (historyEdgeDocument != null) { + if (historyEdgeDocument != null && newEdgeDocument != null) { updateFunction(newEdgeDocument, historyEdgeDocument); docInsert.add(historyEdgeDocument); } else { @@ -183,7 +183,7 @@ public class Relationship extends Thread { } } - protected void checkSchemaProperty(Map properties){ + private void checkSchemaProperty(Map properties){ if (!properties.containsKey("TLS_CNT_TOTAL")){ properties.put("TLS_CNT_TOTAL",0L); properties.put("TLS_CNT_RECENT",new long[7]); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Vertex.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java similarity index 99% rename from IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Vertex.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java index 1b7ba73..f34a510 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Vertex.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java @@ -1,4 +1,4 @@ -package cn.ac.iie.etl.update; +package cn.ac.iie.service.update; import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.utils.ArangoDBConnect; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Fqdn.java similarity index 90% rename from IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Fqdn.java index 426a38b..4251910 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Fqdn.java @@ -1,6 +1,6 @@ -package cn.ac.iie.etl.vertex; +package cn.ac.iie.service.vertex; -import cn.ac.iie.etl.update.Vertex; +import cn.ac.iie.service.update.Vertex; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; import com.arangodb.entity.BaseEdgeDocument; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java similarity index 93% rename from IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Ip.java rename to IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java index efa37cf..b5c5610 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java @@ -1,6 +1,6 @@ -package cn.ac.iie.etl.vertex; +package cn.ac.iie.service.vertex; -import cn.ac.iie.etl.update.Vertex; +import cn.ac.iie.service.update.Vertex; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; import com.arangodb.entity.BaseEdgeDocument; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Subscriber.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Subscriber.java new file mode 100644 index 0000000..597c6c6 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Subscriber.java @@ -0,0 +1,22 @@ +package cn.ac.iie.service.vertex; + +import cn.ac.iie.service.update.Vertex; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class Subscriber extends Vertex { + + public Subscriber(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index 45bb7e4..60e3b26 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -46,7 +46,7 @@ public class ArangoDBConnect { return arangoDB.db(ApplicationConfig.ARANGODB_DB_NAME); } - public static void clean(){ + public void clean(){ try { if (arangoDB != null){ arangoDB.shutdown(); diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index 9a1f229..3a3942b 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -1,5 +1,5 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.127 +arangoDB.host=192.168.40.182 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111