diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java index 1be006a..c00523c 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -1,69 +1,51 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.etl.ArangoEFqdnAddressIpToMap; -import cn.ac.iie.etl.ArangoEIpVisitFqdnToMap; -import cn.ac.iie.etl.ArangoVFqdnToMap; -import cn.ac.iie.etl.ArangoVIpToMap; +import cn.ac.iie.etl.read.ReadHistoryArangoData; import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.ArangoCursor; import com.arangodb.entity.BaseDocument; import com.arangodb.entity.BaseEdgeDocument; - import java.util.concurrent.ConcurrentHashMap; public class BaseArangoData { - public static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); - public static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); public static ConcurrentHashMap e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>(); public static ConcurrentHashMap e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>(); - private static final ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); + private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); - private static final ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); + private static ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); - public static void BaseVFqdnDataMap() { - String sql = "LET FQDN = (FOR doc IN V_FQDN RETURN doc) return {max_time:MAX(FQDN[*].FIRST_FOUND_TIME),min_time:MIN(FQDN[*].FIRST_FOUND_TIME)}"; - long[] timeLimit = getTimeLimit(sql); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - ArangoVFqdnToMap ArangoVFqdnToMap = new ArangoVFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i); - threadPool.executor(ArangoVFqdnToMap); + public void baseDocumentDataMap(){ + readHistoryData("FQDN", v_Fqdn_Map); + readHistoryData("IP", v_Ip_Map); + readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map); + readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map); + threadPool.shutdown(); + threadPool.awaitThreadTask(); + } + + private void readHistoryData(String table, ConcurrentHashMap map){ + try { + long[] timeRange = getTimeRange(table); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + String sql = getQuerySql(timeRange, i, table); + ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData(arangoDBConnect, sql, map); + threadPool.executor(readHistoryArangoData); + } + }catch (Exception e){ + e.printStackTrace(); } } - public static void BaseVIpDataMap() { - String sql = "LET IP = (FOR doc IN V_IP RETURN doc) return {max_time:MAX(IP[*].FIRST_FOUND_TIME),min_time:MIN(IP[*].FIRST_FOUND_TIME)}"; - long[] timeLimit = getTimeLimit(sql); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - ArangoVIpToMap arangoVIpToMap = new ArangoVIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i); - threadPool.executor(arangoVIpToMap); - } - } - - public static void BaseEFqdnAddressIpDataMap(){ - String sql = "LET e = (FOR doc IN E_ADDRESS_V_FQDN_TO_V_IP RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}"; - long[] timeLimit = getTimeLimit(sql); - for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - ArangoEFqdnAddressIpToMap arangoEFqdnAddressIpToMap = new ArangoEFqdnAddressIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i); - threadPool.executor(arangoEFqdnAddressIpToMap); - } - } - - public static void BaseEIpVisitFqdnDataMap(){ - String sql = "LET e = (FOR doc IN E_VISIT_V_IP_TO_V_FQDN RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}"; - long[] timeLimit = getTimeLimit(sql); - for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - ArangoEIpVisitFqdnToMap arangoEIpVisitFqdnToMap = new ArangoEIpVisitFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i); - threadPool.executor(arangoEIpVisitFqdnToMap); - } - } - - private static long[] getTimeLimit(String sql) { + private long[] getTimeRange(String table){ long minTime = 0L; long maxTime = 0L; - long diffTime = 0L; long startTime = System.currentTimeMillis(); + String sql = "LET doc = (FOR doc IN "+table+" RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}"; ArangoCursor timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class); try { if (timeDoc != null){ @@ -74,14 +56,24 @@ public class BaseArangoData { } long lastTime = System.currentTimeMillis(); System.out.println("查询最大最小时间用时:" + (lastTime - startTime)); - diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; }else { System.out.println("获取最大最小时间异常"); } }catch (Exception e){ e.printStackTrace(); } - return new long[]{minTime, maxTime, diffTime}; + return new long[]{minTime, maxTime}; + } + private String getQuerySql(long[] timeRange,int threadNumber,String table){ + long minTime = timeRange[0]; + long maxTime = timeRange[1]; + long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; + long maxThreadTime = minTime + (threadNumber + 1)* diffTime; + long minThreadTime = minTime + threadNumber * diffTime; + return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; + } + + } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index 10dce04..cf54b7d 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -6,6 +6,7 @@ import cn.ac.iie.etl.UpdateEIpVisitFqdn; import cn.ac.iie.etl.UpdateVFqdn; import cn.ac.iie.etl.UpdateVIP; import cn.ac.iie.utils.ClickhouseConnect; +import cn.ac.iie.utils.TopDomainUtils; import com.alibaba.druid.pool.DruidPooledConnection; import com.arangodb.entity.BaseDocument; import com.arangodb.entity.BaseEdgeDocument; @@ -13,11 +14,12 @@ import com.arangodb.entity.BaseEdgeDocument; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; +import java.util.ArrayList; import java.util.HashMap; public class BaseClickhouseData { private static final ClickhouseConnect manger = ClickhouseConnect.getInstance(); - private static HashMap> vFqdnMap = new HashMap<>(); + private static HashMap>> vFqdnMap = new HashMap<>(); private static HashMap> vIpMap = new HashMap<>(); private static HashMap> eFqdnAddressIpMap = new HashMap<>(); private static HashMap> eIpVisitFqdnMap = new HashMap<>(); @@ -26,46 +28,6 @@ public class BaseClickhouseData { public BaseClickhouseData(){} - public ResultSet BaseRealTimeVFqdn(){ - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" and media_domain != '' "; - String sql = "SELECT media_domain AS FQDN_NAME,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT( * ) AS FQDN_COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY media_domain"; - System.out.println(sql); - return manger.executorQuery(sql,connection,pstm); - } - - public ResultSet BaseRealTimeVIp(){ - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = " recv_time >= "+minTime+" and recv_time <= "+maxTime; - String sql = "SELECT IP,location,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT(*) AS IP_COUNT_TOTAL FROM(( SELECT s1_s_ip AS IP, s1_s_location_region AS location, recv_time FROM media_expire_patch where "+where+" ) UNION ALL ( SELECT s1_d_ip AS IP, s1_d_location_region AS location, recv_time FROM media_expire_patch where "+where+" )) GROUP BY IP,location"; - System.out.println(sql); - return manger.executorQuery(sql,connection,pstm); - } - - public ResultSet BaseReadTimeEFqdnAddressIp(){ - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND media_domain != '' AND s1_d_ip != '' "; - String sql = "SELECT media_domain AS V_FQDN,s1_d_ip AS V_IP,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_d_ip,media_domain"; - System.out.println(sql); - return manger.executorQuery(sql,connection,pstm); - } - - public ResultSet BaseRealTimeEIpVisitFqdn(){ - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" AND s1_s_ip != '' AND media_domain != '' "; - String sql = "SELECT s1_s_ip AS V_IP,media_domain AS V_FQDN,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_s_ip,media_domain"; - System.out.println(sql); - return manger.executorQuery(sql,connection,pstm); - } - private static long[] getTimeLimit(){ long maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; long minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; @@ -74,7 +36,7 @@ public class BaseClickhouseData { static { for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - vFqdnMap.put(i,new HashMap()); + vFqdnMap.put(i,new HashMap>()); } System.out.println("V_FQDN resultMap初始化完成"); for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ @@ -92,6 +54,7 @@ public class BaseClickhouseData { } public static void BaseVFqdn(){ + BaseVDomainFromReferer(); long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; @@ -115,13 +78,15 @@ public class BaseClickhouseData { newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); newDoc.addAttribute("FQDN_COUNT_TOTAL",fqdnCountTotal); int i = fqdnName.hashCode() % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap documentHashMap = vFqdnMap.getOrDefault(i, new HashMap()); - documentHashMap.put(fqdnName,newDoc); + HashMap> documentHashMap = vFqdnMap.getOrDefault(i, new HashMap<>()); + ArrayList documentArrayList = documentHashMap.getOrDefault(fqdnName, new ArrayList<>()); + documentArrayList.add(newDoc); + documentHashMap.put(fqdnName,documentArrayList); } long last = System.currentTimeMillis(); System.out.println("读取clickhouse v_FQDN时间:"+(last - start)); for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - HashMap baseDocumentHashMap = vFqdnMap.get(i); + HashMap> baseDocumentHashMap = vFqdnMap.get(i); UpdateVFqdn updateVFqdn = new UpdateVFqdn(baseDocumentHashMap); updateVFqdn.run(); } @@ -130,6 +95,43 @@ public class BaseClickhouseData { } } + private static void BaseVDomainFromReferer(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" and s1_referer != '' "; + String sql = "SELECT s1_referer AS FQDN_NAME,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT( * ) AS FQDN_COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_referer"; + System.out.println(sql); + long start = System.currentTimeMillis(); + try { + DruidPooledConnection connection = manger.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()){ + String referer = resultSet.getString("FQDN_NAME"); + String fqdnName = TopDomainUtils.getDomainFromUrl(referer); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long fqdnCountTotal = resultSet.getLong("FQDN_COUNT_TOTAL"); + BaseDocument newDoc = new BaseDocument(); + newDoc.setKey(fqdnName); + newDoc.addAttribute("FQDN_NAME",fqdnName); + newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); + newDoc.addAttribute("FQDN_COUNT_TOTAL",fqdnCountTotal); + int i = fqdnName.hashCode() % ApplicationConfig.THREAD_POOL_NUMBER; + HashMap> documentHashMap = vFqdnMap.getOrDefault(i, new HashMap<>()); + ArrayList documentArrayList = documentHashMap.getOrDefault(fqdnName, new ArrayList<>()); + documentArrayList.add(newDoc); + documentHashMap.put(fqdnName,documentArrayList); + } + long last = System.currentTimeMillis(); + System.out.println("读取clickhouse v_FQDN时间:"+(last - start)); + }catch (Exception e){ + e.printStackTrace(); + } + } + public static void BaseVIp(){ long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; @@ -213,6 +215,33 @@ public class BaseClickhouseData { } } + public static void BaseEdgeFqdnSameFqdn(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND media_domain != '' AND s1_d_ip != '' "; + String sql = "SELECT s1_domain AS V_FQDN,s1_referer,MIN(recv_time) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_referer,s1_domain"; + System.out.println(sql); + try { + DruidPooledConnection connection = manger.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()){ + String vFqdn = resultSet.getString("V_FQDN"); + String referer = resultSet.getString("s1_referer"); + String refererDomain = TopDomainUtils.getDomainFromUrl(referer); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String key = vFqdn+"-"+refererDomain; + + } + }catch (Exception e){ + e.printStackTrace(); + } + + } + public static void BaseEIpVisitFqdn(){ long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java deleted file mode 100644 index a2ca34b..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java +++ /dev/null @@ -1,49 +0,0 @@ -package cn.ac.iie.etl; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseEdgeDocument; - -import java.util.List; - -public class ArangoEFqdnAddressIpToMap implements Runnable{ - - private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; - - private ArangoEFqdnAddressIpToMap(){} - - public ArangoEFqdnAddressIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { - this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; - } - - public void run() { - String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN E_ADDRESS_V_FQDN_TO_V_IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; - System.out.println(name + ":" + query); - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); - if (docs != null){ - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (BaseEdgeDocument doc : baseDocuments) { - String key = doc.getKey(); - BaseArangoData.e_Fqdn_Address_Ip_Map.put(key, doc); - i++; - } - System.out.println(name + ":共处理数据" + i); - long l = System.currentTimeMillis(); - System.out.println(name + "运行时间:" + (l - s)); - }else { - System.out.println("查询异常"); - } - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java deleted file mode 100644 index bbbeb5d..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java +++ /dev/null @@ -1,50 +0,0 @@ -package cn.ac.iie.etl; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseDocument; - -import java.util.List; - -public class ArangoVFqdnToMap implements Runnable { - - private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; - - private ArangoVFqdnToMap(){} - - public ArangoVFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { - this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; - } - - public void run() { - String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1)* diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN V_FQDN filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; - System.out.println(name+":"+query); - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); - - if (docs != null){ - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (BaseDocument doc:baseDocuments){ - String key = doc.getKey(); - BaseArangoData.v_Fqdn_Map.put(key,doc); - i++; - } - System.out.println(name+":共处理数据"+ i); - long l = System.currentTimeMillis(); - System.out.println(name+"运行时间:"+(l-s)); - }else { - System.out.println("查询异常"); - } - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java deleted file mode 100644 index 0f8fcb4..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java +++ /dev/null @@ -1,48 +0,0 @@ -package cn.ac.iie.etl; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseDocument; - -import java.util.List; - -public class ArangoVIpToMap implements Runnable { - - private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; - - private ArangoVIpToMap() {} - - public ArangoVIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { - this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; - } - - public void run() { - String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN V_IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; - System.out.println(name + ":" + query); - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); - if (docs != null){ - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (BaseDocument doc : baseDocuments) { - String key = doc.getKey(); - BaseArangoData.v_Ip_Map.put(key, doc); - i++; - } - System.out.println(name + ":共处理数据" + i); - long l = System.currentTimeMillis(); - System.out.println(name + "运行时间:" + (l - s)); - } - } - -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java index ce1a9ba..0abbc05 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java @@ -13,7 +13,7 @@ import java.util.Set; public class UpdateEFqdnAddressIp implements Runnable { private HashMap documentHashMap; - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); public UpdateEFqdnAddressIp(HashMap documentHashMap) { this.documentHashMap = documentHashMap; diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateGraphsData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateGraphsData.java deleted file mode 100644 index c6e314a..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateGraphsData.java +++ /dev/null @@ -1,214 +0,0 @@ -package cn.ac.iie.etl; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.dao.BaseClickhouseData; -import cn.ac.iie.utils.ArangoDBConnect; -import cn.ac.iie.utils.ClickhouseConnect; -import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; - -import java.sql.ResultSet; -import java.util.ArrayList; - -public class UpdateGraphsData { - - private static final BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); - - private static final ClickhouseConnect clickhouseManger = ClickhouseConnect.getInstance(); - - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public static void updateVFqdn(){ - ResultSet resultSet = baseClickhouseData.BaseRealTimeVFqdn(); - try { - System.out.println("读取clickhouse成功"); - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - System.out.println("开始更新V_FQDN"); - while (resultSet.next()){ - i += 1; - String fqdnName = resultSet.getString("FQDN_NAME"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long fqdnCountTotal = resultSet.getLong("FQDN_COUNT_TOTAL"); - BaseDocument document = BaseArangoData.v_Fqdn_Map.getOrDefault(fqdnName, null); - if (document != null){ - long countTotal = Long.parseLong(document.getAttribute("FQDN_COUNT_TOTAL").toString()); - document.addAttribute("LAST_FOUND_TIME",lastFoundTime); - document.addAttribute("FQDN_COUNT_TOTAL",countTotal+fqdnCountTotal); - docUpdate.add(document); - }else { - BaseDocument newDoc = new BaseDocument(); - newDoc.setKey(fqdnName); - newDoc.addAttribute("FQDN_NAME",fqdnName); - newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); - newDoc.addAttribute("FQDN_COUNT_TOTAL",fqdnCountTotal); - docInsert.add(newDoc); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"V_FQDN"); - System.out.println("更新"+i); - i = 0; - } - } - if (i != 0){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"V_FQDN"); - System.out.println("更新"+i); - } - }catch (Exception e){ - e.printStackTrace(); - }finally { - clickhouseManger.clear(baseClickhouseData.pstm,baseClickhouseData.connection); - } - } - - public static void updateVIp(){ - ResultSet resultSet = baseClickhouseData.BaseRealTimeVIp(); - try { - System.out.println("读取clickhouse成功"); - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - System.out.println("开始更新V_IP"); - while (resultSet.next()){ - i += 1; - String ip = resultSet.getString("IP"); - String location = resultSet.getString("location"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long ipCountTotal = resultSet.getLong("IP_COUNT_TOTAL"); - BaseDocument document = BaseArangoData.v_Ip_Map.getOrDefault(ip, null); - if (document != null){ - long countTotal = Long.parseLong(document.getAttribute("IP_COUNT_TOTAL").toString()); - document.addAttribute("LAST_FOUND_TIME",lastFoundTime); - document.addAttribute("IP_COUNT_TOTAL",countTotal+ipCountTotal); - docUpdate.add(document); - }else { - BaseDocument newDoc = new BaseDocument(); - newDoc.setKey(ip); - newDoc.addAttribute("IP",ip); - newDoc.addAttribute("IP_LOCATION",location); - newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); - newDoc.addAttribute("IP_COUNT_TOTAL",ipCountTotal); - docInsert.add(newDoc); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"V_IP"); - System.out.println("更新"+i); - i = 0; - } - } - if (i != 0){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"V_IP"); - System.out.println("更新"+i); - } - }catch (Exception e){ - e.printStackTrace(); - }finally { - clickhouseManger.clear(baseClickhouseData.pstm,baseClickhouseData.connection); - } - } - - public static void updateEFqdnAddressIp(){ - ResultSet resultSet = baseClickhouseData.BaseReadTimeEFqdnAddressIp(); - try { - System.out.println("读取clickhouse成功"); - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - System.out.println("开始更新E_ADDRESS_V_FQDN_TO_V_IP"); - while (resultSet.next()){ - i += 1; - String vFqdn = resultSet.getString("V_FQDN"); - String vIp = resultSet.getString("V_IP"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - String key = vFqdn+"-"+vIp; - BaseEdgeDocument edgeDocument = BaseArangoData.e_Fqdn_Address_Ip_Map.getOrDefault(key, null); - if (edgeDocument != null){ - long updateCountTotal = Long.parseLong(edgeDocument.getAttribute("COUNT_TOTAL").toString()); - edgeDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); - edgeDocument.addAttribute("COUNT_TOTAL",countTotal+updateCountTotal); - docUpdate.add(edgeDocument); - }else { - BaseEdgeDocument newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("V_FQDN/"+vFqdn); - newDoc.setTo("V_IP/"+vIp); - newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL",countTotal); - docInsert.add(newDoc); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"E_ADDRESS_V_FQDN_TO_V_IP"); - System.out.println("更新"+i); - i = 0; - } - } - if (i != 0){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"E_ADDRESS_V_FQDN_TO_V_IP"); - System.out.println("更新"+i); - } - }catch (Exception e){ - e.printStackTrace(); - }finally { - clickhouseManger.clear(baseClickhouseData.pstm,baseClickhouseData.connection); - } - } - - public static void updateEIpVisitFqdn(){ - ResultSet resultSet = baseClickhouseData.BaseRealTimeEIpVisitFqdn(); - try { - System.out.println("读取clickhouse成功"); - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - System.out.println("开始更新E_VISIT_V_IP_TO_V_FQDN"); - while (resultSet.next()){ - i += 1; - String vIp = resultSet.getString("V_IP"); - String vFqdn = resultSet.getString("V_FQDN"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - String key = vIp +"-"+ vFqdn; - BaseEdgeDocument edgeDocument = BaseArangoData.e_Ip_Visit_Fqdn_Map.getOrDefault(key, null); - if (edgeDocument != null){ - long updateCountTotal = Long.parseLong(edgeDocument.getAttribute("COUNT_TOTAL").toString()); - edgeDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); - edgeDocument.addAttribute("COUNT_TOTAL",countTotal+updateCountTotal); - docUpdate.add(edgeDocument); - }else { - BaseEdgeDocument newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("V_IP/"+vIp); - newDoc.setTo("V_FQDN/"+vFqdn); - newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL",countTotal); - docInsert.add(newDoc); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"E_VISIT_V_IP_TO_V_FQDN"); - System.out.println("更新"+i); - i = 0; - } - } - if (i != 0){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"E_VISIT_V_IP_TO_V_FQDN"); - System.out.println("更新"+i); - } - }catch (Exception e){ - e.printStackTrace(); - }finally { - clickhouseManger.clear(baseClickhouseData.pstm,baseClickhouseData.connection); - } - } - -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java index eba800f..037dc40 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java @@ -7,15 +7,16 @@ import com.arangodb.entity.BaseDocument; import java.util.ArrayList; import java.util.HashMap; +import java.util.Map; import java.util.Set; public class UpdateVFqdn implements Runnable{ - private HashMap documentHashMap; + private HashMap> documentHashMap; - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - public UpdateVFqdn(HashMap documentHashMap) { + public UpdateVFqdn(HashMap> documentHashMap) { this.documentHashMap = documentHashMap; } @@ -27,7 +28,9 @@ public class UpdateVFqdn implements Runnable{ int i = 0; try { for (String key:keySet){ - BaseDocument newDocument = documentHashMap.getOrDefault(key, null); + ArrayList documentArrayList = documentHashMap.getOrDefault(key, null); + BaseDocument newDocument = mergeVFqdn(documentArrayList); + if (newDocument != null){ i += 1; BaseDocument document = BaseArangoData.v_Fqdn_Map.getOrDefault(key, null); @@ -57,4 +60,35 @@ public class UpdateVFqdn implements Runnable{ } } + + private BaseDocument mergeVFqdn(ArrayList documentArrayList){ + if (documentArrayList == null || documentArrayList.isEmpty()){ + return null; + }else if (documentArrayList.size() == 1){ + return documentArrayList.get(0); + }else { + BaseDocument document = new BaseDocument(); + Map properties = document.getProperties(); + for (BaseDocument doc:documentArrayList){ + if (properties.isEmpty()){ + document = doc; + properties = doc.getProperties(); + }else { + long firstFoundTime = Long.parseLong(properties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); + long docFirstFoundTime = Long.parseLong(doc.getAttribute("FIRST_FOUND_TIME").toString()); + properties.put("FIRST_FOUND_TIME",firstFoundTimedocLastFoundTime? lastFoundTime:docLastFoundTime); + + long fqdnCountTotal = Long.parseLong(properties.getOrDefault("FQDN_COUNT_TOTAL", 0L).toString()); + long docFqdnCountTotal = Long.parseLong(doc.getAttribute("FQDN_COUNT_TOTAL").toString()); + properties.put("FQDN_COUNT_TOTAL",fqdnCountTotal+docFqdnCountTotal); + } + } + document.setProperties(properties); + return document; + } + } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java similarity index 51% rename from ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java index 0bf7ca9..971b29b 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java @@ -1,49 +1,44 @@ -package cn.ac.iie.etl; +package cn.ac.iie.etl.read; -import cn.ac.iie.dao.BaseArangoData; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.ArangoCursor; import com.arangodb.entity.BaseEdgeDocument; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; -public class ArangoEIpVisitFqdnToMap implements Runnable { +/** + * @author wlh + * 多线程全量读取arangoDb历史数据,封装到map + */ +public class ReadHistoryArangoData extends Thread { private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; + private String query; + private ConcurrentHashMap map; - private ArangoEIpVisitFqdnToMap(){} - - public ArangoEIpVisitFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + public ReadHistoryArangoData(ArangoDBConnect arangoDBConnect, String query, ConcurrentHashMap map) { this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; + this.query = query; + this.map = map; } + @Override public void run() { String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN E_VISIT_V_IP_TO_V_FQDN filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; System.out.println(name + ":" + query); long s = System.currentTimeMillis(); ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); - if (docs != null){ List baseDocuments = docs.asListRemaining(); int i = 0; for (BaseEdgeDocument doc : baseDocuments) { String key = doc.getKey(); - BaseArangoData.e_Ip_Visit_Fqdn_Map.put(key, doc); + map.put(key, doc); i++; } System.out.println(name + ":共处理数据" + i); long l = System.currentTimeMillis(); System.out.println(name + "运行时间:" + (l - s)); - }else { - System.out.println("查询异常"); } } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java new file mode 100644 index 0000000..3988096 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java @@ -0,0 +1,31 @@ +package cn.ac.iie.etl.relationship; + +import cn.ac.iie.etl.update.Relationship; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class LocateFqdn2Ip extends Relationship { + + public LocateFqdn2Ip(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap); + } + + @Override + protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc){ + super.mergeFunction(properties,schemaEdgeDoc); + super.mergeDistinctClientIp(properties,schemaEdgeDoc); + } + + @Override + protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { + super.updateFunction(newEdgeDocument, historyEdgeDocument); + super.updateDistinctClientIp(newEdgeDocument, historyEdgeDocument); + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java new file mode 100644 index 0000000..efafb16 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java @@ -0,0 +1,17 @@ +package cn.ac.iie.etl.relationship; + +import cn.ac.iie.etl.update.Relationship; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; + +public class VisitIp2Fqdn extends Relationship { + public VisitIp2Fqdn(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap); + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Relationship.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Relationship.java new file mode 100644 index 0000000..78141ae --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Relationship.java @@ -0,0 +1,191 @@ +package cn.ac.iie.etl.update; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public class Relationship extends Thread { + + protected HashMap> newDocumentHashMap; + protected ArangoDBConnect arangoManger; + protected String collectionName; + protected ConcurrentHashMap historyDocumentMap; + + public Relationship(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap) { + this.newDocumentHashMap = newDocumentHashMap; + this.arangoManger = arangoManger; + this.collectionName = collectionName; + this.historyDocumentMap = historyDocumentMap; + } + + @Override + public void run() { + Set keySet = newDocumentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + int i = 0; + try { + for (String key : keySet) { + HashMap newEdgeDocumentSchemaMap = newDocumentHashMap.getOrDefault(key, null); + if (newEdgeDocumentSchemaMap != null) { + BaseEdgeDocument newEdgeDocument = mergeRelationship(newEdgeDocumentSchemaMap); + i += 1; + BaseEdgeDocument historyEdgeDocument = historyDocumentMap.getOrDefault(key, null); + updateRelationship(newEdgeDocument,historyEdgeDocument,docInsert); + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { + arangoManger.overwrite(docInsert, collectionName); + System.out.println("更新"+collectionName+":" + i); + i = 0; + } + } + } + if (i != 0) { + arangoManger.overwrite(docInsert, collectionName); + System.out.println("更新"+collectionName+":" + i); + } + } catch (Exception e) { + e.printStackTrace(); + System.out.println(e.toString()); + } + } + + private BaseEdgeDocument mergeRelationship(HashMap newEdgeDocumentSchemaMap) { + BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument(); + Set schemaSets = newEdgeDocumentSchemaMap.keySet(); + Map properties = newBaseEdgeDocument.getProperties(); + + for (String schema : schemaSets) { + BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentSchemaMap.get(schema); + if (!properties.isEmpty()) { + mergeFunction(properties, schemaEdgeDoc); + } else { + newBaseEdgeDocument = schemaEdgeDoc; + properties = schemaEdgeDoc.getProperties(); + } + setSchemaCount(schema, schemaEdgeDoc, properties); + } + properties.remove("COUNT_TOTAL"); + checkSchemaProperty(properties); + + newBaseEdgeDocument.setProperties(properties); + return newBaseEdgeDocument; + } + + private void updateRelationship(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument,ArrayList docInsert){ + if (historyEdgeDocument != null) { + updateFunction(newEdgeDocument, historyEdgeDocument); + docInsert.add(historyEdgeDocument); + } else { + docInsert.add(newEdgeDocument); + } + } + + protected void updateFunction(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument){ + updateFoundTime(newEdgeDocument,historyEdgeDocument); + setSchemaCntByHistory(historyEdgeDocument,"TLS_CNT_RECENT","TLS_CNT_TOTAL",newEdgeDocument); + setSchemaCntByHistory(historyEdgeDocument,"HTTP_CNT_RECENT","HTTP_CNT_TOTAL",newEdgeDocument); +// updateDistinctClientIp(newEdgeDocument,historyEdgeDocument); + } + + protected void updateFoundTime(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument){ + Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); + historyEdgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); + } + + protected void setSchemaCntByHistory(BaseEdgeDocument historyEdgeDocument,String schema,String totalSchema,BaseEdgeDocument newEdgeDocument){ + long countTotal = Long.parseLong(newEdgeDocument.getAttribute(totalSchema).toString()); + long updateCountTotal = Long.parseLong(historyEdgeDocument.getAttribute(totalSchema).toString()); + + ArrayList cntRecent = (ArrayList) historyEdgeDocument.getAttribute(schema); + Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); + Long[] cntRecentsDst = new Long[7]; + System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); + cntRecentsDst[0] = countTotal; + + historyEdgeDocument.addAttribute(schema, cntRecentsDst); + historyEdgeDocument.addAttribute(totalSchema, countTotal + updateCountTotal); + } + + protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc) { + mergeFoundTime(properties, schemaEdgeDoc); +// mergeDistinctClientIp(properties,schemaEdgeDoc); + } + + protected void mergeDistinctClientIp(Map properties, BaseEdgeDocument schemaEdgeDoc){ + String[] schemaDistCipRecents = (String[]) schemaEdgeDoc.getAttribute("DIST_CIP_RECENT"); + String[] distCipRecents = (String[]) properties.get("DIST_CIP_RECENT"); + Object[] mergeClientIp = distinctIp(schemaDistCipRecents, distCipRecents); + properties.put("DIST_CIP_RECENT", mergeClientIp); + properties.put("DIST_CIP_TOTAL",mergeClientIp); + } + + protected void updateDistinctClientIp(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ + ArrayList distCipTotal = (ArrayList) edgeDocument.getAttribute("DIST_CIP_TOTAL"); + String[] distCipTotalsSrc = distCipTotal.toArray(new String[distCipTotal.size()]); + + Object[] distCipRecentsSrc = (Object[])newEdgeDocument.getAttribute("DIST_CIP_RECENT"); + if (distCipTotalsSrc.length == 30) { + Object[] distCipTotals = distinctIp(distCipTotalsSrc, distCipRecentsSrc); + edgeDocument.addAttribute("DIST_CIP_TOTAL", distCipTotals); + } + edgeDocument.addAttribute("DIST_CIP_RECENT", distCipRecentsSrc); + } + + protected Object[] distinctIp(Object[] distCipTotalsSrc,Object[] distCipRecentsSrc){ + HashSet dIpSet = new HashSet<>(); + dIpSet.addAll(Arrays.asList(distCipRecentsSrc)); + dIpSet.addAll(Arrays.asList(distCipTotalsSrc)); + Object[] distCipTotals = dIpSet.toArray(); + if (distCipTotals.length > 30) { + System.arraycopy(distCipTotals, 0, distCipTotals, 0, 30); + } + return distCipTotals; + } + + protected void mergeFoundTime(Map properties, BaseEdgeDocument schemaEdgeDoc) { + long schemaFirstFoundTime = Long.parseLong(schemaEdgeDoc.getAttribute("FIRST_FOUND_TIME").toString()); + long firstFoundTime = Long.parseLong(properties.get("FIRST_FOUND_TIME").toString()); + properties.put("FIRST_FOUND_TIME", schemaFirstFoundTime < firstFoundTime ? schemaFirstFoundTime : firstFoundTime); + long schemaLastFoundTime = Long.parseLong(schemaEdgeDoc.getAttribute("LAST_FOUND_TIME").toString()); + long lastFoundTime = Long.parseLong(properties.get("LAST_FOUND_TIME").toString()); + properties.put("LAST_FOUND_TIME", schemaLastFoundTime > lastFoundTime ? schemaLastFoundTime : lastFoundTime); + } + + protected void setSchemaCount(String schema, BaseEdgeDocument schemaEdgeDoc, Map properties) { + switch (schema) { + case "HTTP": + long httpCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); + properties.put("HTTP_CNT_TOTAL", httpCntTotal); + long[] httpCntRecentsDst = new long[7]; + httpCntRecentsDst[0] = httpCntTotal; + properties.put("HTTP_CNT_RECENT", httpCntRecentsDst); + break; + case "SSL": + long tlsCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); + properties.put("TLS_CNT_TOTAL", tlsCntTotal); + long[] tlsCntRecentsDst = new long[7]; + tlsCntRecentsDst[0] = tlsCntTotal; + properties.put("TLS_CNT_RECENT", tlsCntRecentsDst); + break; + default: + break; + } + } + + protected void checkSchemaProperty(Map properties){ + if (!properties.containsKey("TLS_CNT_TOTAL")){ + properties.put("TLS_CNT_TOTAL",0L); + properties.put("TLS_CNT_RECENT",new long[7]); + }else if (!properties.containsKey("HTTP_CNT_TOTAL")){ + properties.put("HTTP_CNT_TOTAL",0L); + properties.put("HTTP_CNT_RECENT",new long[7]); + } + } + + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Vertex.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Vertex.java new file mode 100644 index 0000000..88da292 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Vertex.java @@ -0,0 +1,116 @@ +package cn.ac.iie.etl.update; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author wlh + * 多线程更新vertex数据 + */ +public class Vertex extends Thread{ + + protected HashMap> newDocumentHashMap; + protected ArangoDBConnect arangoManger; + protected String collectionName; + protected ConcurrentHashMap historyDocumentMap; + + public Vertex(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap){ + this.newDocumentHashMap = newDocumentHashMap; + this.arangoManger = arangoManger; + this.collectionName = collectionName; + this.historyDocumentMap = historyDocumentMap; + } + + @Override + public void run() { + Set keySet = newDocumentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + int i = 0; + try { + for (String key:keySet){ + ArrayList documentArrayList = newDocumentHashMap.getOrDefault(key, null); + BaseDocument newDocument = mergeVertex(documentArrayList); + if (newDocument != null){ + i += 1; + BaseDocument historyDocument = historyDocumentMap.getOrDefault(key, null); + updateVertex(newDocument,historyDocument,docInsert); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + arangoManger.overwrite(docInsert,collectionName); + System.out.println("更新"+i); + i = 0; + } + } + if (i != 0){ + arangoManger.overwrite(docInsert,collectionName); + System.out.println("更新"+i); + } + }catch (Exception e){ + e.printStackTrace(); + } + } + + private void updateVertex(BaseDocument newDocument,BaseDocument historyDocument,ArrayList docInsert){ + if (historyDocument != null){ + updateFunction(newDocument,historyDocument); + docInsert.add(historyDocument); + }else { + docInsert.add(newDocument); + } + } + + protected void updateFunction(BaseDocument newDocument,BaseDocument historyDocument){ + updateFoundTime(newDocument,historyDocument); + } + + private void updateFoundTime(BaseDocument newDocument,BaseDocument historyDocument){ + Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); + historyDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); + } + + private BaseDocument mergeVertex(ArrayList documentArrayList){ + if (documentArrayList == null || documentArrayList.isEmpty()){ + return null; + }else if (documentArrayList.size() == 1){ + return documentArrayList.get(0); + }else { + BaseDocument document = new BaseDocument(); + Map properties = document.getProperties(); + for (BaseDocument doc:documentArrayList){ + if (properties.isEmpty()){ + document = doc; + properties = doc.getProperties(); + }else { + mergeFunction(properties,doc); + } + } + document.setProperties(properties); + return document; + } + } + + protected void mergeFunction(Map properties,BaseDocument doc){ + mergeFoundTime(properties,doc); + } + + private void mergeFoundTime(Map properties,BaseDocument doc){ + long firstFoundTime = Long.parseLong(properties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); + long docFirstFoundTime = Long.parseLong(doc.getAttribute("FIRST_FOUND_TIME").toString()); + properties.put("FIRST_FOUND_TIME",firstFoundTimedocLastFoundTime? lastFoundTime:docLastFoundTime); + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java new file mode 100644 index 0000000..7e2172d --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java @@ -0,0 +1,20 @@ +package cn.ac.iie.etl.vertex; + +import cn.ac.iie.etl.update.Vertex; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; + +public class Fqdn extends Vertex { + + public Fqdn(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap); + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Ip.java new file mode 100644 index 0000000..e04cd96 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Ip.java @@ -0,0 +1,20 @@ +package cn.ac.iie.etl.vertex; + +import cn.ac.iie.etl.update.Vertex; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; + +public class Ip extends Vertex { + + public Ip(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap); + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java index 8b6bf21..6427961 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java @@ -11,21 +11,15 @@ public class IpLearningApplicationTest { public static void main(String[] args) { long startA = System.currentTimeMillis(); -// BaseArangoData.BaseVFqdnDataMap(); -// BaseArangoData.BaseVIpDataMap(); -// BaseArangoData.BaseEFqdnAddressIpDataMap(); -// BaseArangoData.BaseEIpVisitFqdnDataMap(); + BaseArangoData baseArangoData = new BaseArangoData(); + baseArangoData.baseDocumentDataMap(); -// ExecutorThreadPool.shutdown(); -// ExecutorThreadPool.awaitThreadTask(); long lastA = System.currentTimeMillis(); System.out.println("读取ArangoDb时间:"+(lastA - startA)); -// UpdateGraphsData.updateVFqdn(); -// UpdateGraphsData.updateVIp(); -// UpdateGraphsData.updateEFqdnAddressIp(); -// UpdateGraphsData.updateEIpVisitFqdn(); + + /* long startC = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(4); @@ -68,6 +62,8 @@ public class IpLearningApplicationTest { long lastC = System.currentTimeMillis(); System.out.println("更新ArangoDb时间:"+(lastC - startC)); + */ + System.out.println(BaseArangoData.v_Fqdn_Map.size()); System.out.println(BaseArangoData.v_Ip_Map.size()); System.out.println(BaseArangoData.e_Fqdn_Address_Ip_Map.size()); diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index f1c75f0..ddd0500 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -5,10 +5,15 @@ import com.arangodb.ArangoCollection; import com.arangodb.ArangoCursor; import com.arangodb.ArangoDB; import com.arangodb.ArangoDatabase; +import com.arangodb.entity.DocumentCreateEntity; +import com.arangodb.entity.ErrorEntity; +import com.arangodb.entity.MultiDocumentEntity; import com.arangodb.model.AqlQueryOptions; +import com.arangodb.model.DocumentCreateOptions; import com.arangodb.util.MapBuilder; import java.util.ArrayList; +import java.util.Collection; import java.util.Map; public class ArangoDBConnect { @@ -62,6 +67,7 @@ public class ArangoDBConnect { } } + @Deprecated public void insertAndUpdate(ArrayList docInsert,ArrayList docUpdate,String collectionName){ ArangoDatabase database = getDatabase(); try { @@ -81,4 +87,27 @@ public class ArangoDBConnect { } } + public void overwrite(ArrayList docOverwrite,String collectionName){ + ArangoDatabase database = getDatabase(); + try { + ArangoCollection collection = database.collection(collectionName); + if (!docOverwrite.isEmpty()){ + DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions(); + documentCreateOptions.overwrite(true); + documentCreateOptions.silent(true); + MultiDocumentEntity> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions); + Collection errors = documentCreateEntityMultiDocumentEntity.getErrors(); + for (ErrorEntity errorEntity:errors){ + System.out.println("写入arangoDB异常:"+errorEntity.getErrorMessage()); + } + } + }catch (Exception e){ + System.out.println("更新失败:"+e.toString()); + }finally { + docOverwrite.clear(); + } + } + + + } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java index ffd5b5a..29cc5a5 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java @@ -29,7 +29,7 @@ public class ExecutorThreadPool { pool.execute(command); } - public static void awaitThreadTask(){ + public void awaitThreadTask(){ try { while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) { System.out.println("线程池没有关闭"); @@ -39,7 +39,7 @@ public class ExecutorThreadPool { } } - public static void shutdown(){ + public void shutdown(){ pool.shutdown(); } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java new file mode 100644 index 0000000..b2823d1 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java @@ -0,0 +1,156 @@ +package cn.ac.iie.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.util.HashMap; + +public class TopDomainUtils { + private static Logger logger = LoggerFactory.getLogger(TopDomainUtils.class); + + + public static String getSecDomain(String urlDomain, HashMap> maps) { + String[] split = urlDomain.split("\\."); + String secDomain = null; + for (int i = split.length - 1; i >= 0; i--) { + int maps_index = split.length - (i + 1); + HashMap innerMap = maps.get("map_id_" + maps_index); + HashMap fullTop = maps.get("full"); + if (!(innerMap.containsKey(split[i]))) { + String strSec = ""; + for (int j = i; j < split.length; j++) { + strSec += (split[j] + "."); + } + secDomain = strSec.substring(0, strSec.length() - 1); + if (fullTop.containsKey(getTopDomainFromSecDomain(secDomain))) { + break; + } else { + while (!fullTop.containsKey(getTopDomainFromSecDomain(secDomain)) && getTopDomainFromSecDomain(secDomain).contains(".")) { + secDomain = getTopDomainFromSecDomain(secDomain); + } + break; + } + } + } + return secDomain; + } + + private static String getTopDomainFromSecDomain(String secDomain) { + String quFirstDian = secDomain; + if (secDomain.contains(".")) { + quFirstDian = secDomain.substring(secDomain.indexOf(".")).substring(1); + } + return quFirstDian; + } + + public static HashMap> readTopDomainFile(String filePath) { + HashMap> maps = makeHashMap(filePath); + try { + String encoding = "UTF-8"; + File file = new File(filePath); + if (file.isFile() && file.exists()) { + InputStreamReader read = new InputStreamReader( + new FileInputStream(file), encoding); + BufferedReader bufferedReader = new BufferedReader(read); + String lineTxt = null; + while ((lineTxt = bufferedReader.readLine()) != null) { + HashMap fullTop = maps.get("full"); + fullTop.put(lineTxt, lineTxt); + maps.put("full", fullTop); + String[] split = lineTxt.split("\\."); + for (int i = split.length - 1; i >= 0; i--) { + int maps_index = split.length - (i + 1); + HashMap innerMap = maps.get("map_id_" + maps_index); + innerMap.put(split[i], split[i]); + maps.put("map_id_" + maps_index, innerMap); + } + } + read.close(); + } else { + logger.error("TopDomainUtils>=>readTopDomainFile filePath is wrong--->{" + filePath + "}<---"); + } + } catch (Exception e) { + logger.error("TopDomainUtils>=>readTopDomainFile get filePathData error--->{" + e + "}<---"); + e.printStackTrace(); + } + return maps; + } + + private static int getMaxLength(String filePath) { + int lengthDomain = 0; + try { + String encoding = "UTF-8"; + File file = new File(filePath); + if (file.isFile() && file.exists()) { //判断文件是否存在 + InputStreamReader read = new InputStreamReader( + new FileInputStream(file), encoding);//考虑到编码格式 + BufferedReader bufferedReader = new BufferedReader(read); + String lineTxt = null; + while ((lineTxt = bufferedReader.readLine()) != null) { + String[] split = lineTxt.split("\\."); + if (split.length > lengthDomain) { + lengthDomain = split.length; + } + } + read.close(); + } else { + logger.error("TopDomainUtils>>getMaxLength filePath is wrong--->{" + filePath + "}<---"); + } + } catch (Exception e) { + logger.error("TopDomainUtils>=>getMaxLength get filePathData error--->{" + e + "}<---"); + e.printStackTrace(); + } + return lengthDomain; + } + + private static HashMap> makeHashMap(String filePath) { + int maxLength = getMaxLength(filePath); + HashMap> maps = new HashMap>(); + for (int i = 0; i < maxLength; i++) { + maps.put("map_id_" + i, new HashMap()); + } + maps.put("full", new HashMap()); + return maps; + } + + //通用方法,传入url,返回domain,这里的domain不包含端口号,含有:一定是v6 + public static String getDomainFromUrl(String oriUrl) { + //先按照?切分,排除后续干扰 + //后续操作不再涉及?号,排除http://在?后的情况 + String url = oriUrl.split("[?]")[0]; + //获取file_path与domain + if (url.contains("http://") || url.contains("https://")) { + //包含http://或者https://时 + //获取domain + if (url.split("//")[1].split("/")[0].split(":").length <= 2) { + //按照:切分后最终长度为1或2,说明是v4 + String v4Domain = url.split("//")[1]//按照//切分,索引1包含domain + .split("/")[0]//按照/切分,索引0包含domain + .split(":")[0];//v4按照:切分去除domain上的端口号后,索引0为最终域名 + return v4Domain; + } else { + //按照:切分后长度>2,说明是v6地址,v6地址不包含端口号(暂定),只需要先切分//再切分/ + String v6Domain = url.split("//")[1]//按照//切分,索引1包含domain + .split("/")[0];//v6按照/切分后索引0就是domain + return v6Domain; + } + } else { + //无http://或者https:// + //获取domain + if (url.split("/")[0].split(":").length <= 2) { + //按照:切分后长度为1或2,说明为v4 + //无http://时直接按照/切分,索引0包含域名domain,再按照":"切分,0索引就是domain + String v4Domain = url.split("/")[0].split(":")[0]; + return v4Domain; + } else { + //按照:切分后长度>2,说明为v6,v6地址不包含端口号(暂定),只需要切分/取索引0 + String v6Domain = url.split("/")[0]; + return v6Domain; + } + } + } +} diff --git a/ip-learning-java-test/src/main/resources/application.properties b/ip-learning-java-test/src/main/resources/application.properties index d6eb262..1b22db2 100644 --- a/ip-learning-java-test/src/main/resources/application.properties +++ b/ip-learning-java-test/src/main/resources/application.properties @@ -1,9 +1,9 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.127 +arangoDB.host=192.168.40.182 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111 -arangoDB.DB.name=insert_iplearn_index +arangoDB.DB.name=ip-learning-test arangoDB.batch=100000 arangoDB.ttl=3600 diff --git a/ip-learning-java-test/src/test/java/cn/ac/iie/Test.java b/ip-learning-java-test/src/test/java/cn/ac/iie/Test.java new file mode 100644 index 0000000..96cf122 --- /dev/null +++ b/ip-learning-java-test/src/test/java/cn/ac/iie/Test.java @@ -0,0 +1,29 @@ +package cn.ac.iie; + +import java.lang.reflect.Array; +import java.util.ArrayList; + +public class Test { + public static void main(String args[]) throws Exception { + Tester test = new Tester(); + Tester[] tests = new Tester[0]; +// ArrayList testers = new ArrayList<>(); +// testers.add(test); + Class c1 = tests.getClass().getComponentType(); + Class c2 = Tester.class; + Class c3 = test.getClass(); + + System.out.println(c1.getName()); + System.out.println(c2.getName()); + System.out.println(c3.getName()); + +// Tester[] newTesters = (Tester[]) Array.newInstance(c1, 10); +// Tester newTester = (Tester) c1.newInstance(); +// System.out.println(newTesters.length); + } +} + +class Tester { + private String name; + private String mem; +} diff --git a/ip-learning-java-test/src/test/java/cn/ac/iie/TestArango.java b/ip-learning-java-test/src/test/java/cn/ac/iie/TestArango.java new file mode 100644 index 0000000..6893133 --- /dev/null +++ b/ip-learning-java-test/src/test/java/cn/ac/iie/TestArango.java @@ -0,0 +1,21 @@ +package cn.ac.iie; + +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoDatabase; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.List; + +public class TestArango { + public static void main(String[] args) { + ArangoDBConnect instance = ArangoDBConnect.getInstance(); + String query = "FOR doc IN IP filter doc.FIRST_FOUND_TIME >= 1592996080 and doc.FIRST_FOUND_TIME <= 1593112913 RETURN doc"; + ArangoCursor baseEdgeDocuments = instance.executorQuery(query, BaseEdgeDocument.class); + while (baseEdgeDocuments.hasNext()){ + BaseEdgeDocument next = baseEdgeDocuments.next(); + System.out.println(next.toString()); + } + ArangoDBConnect.clean(); + } +}