diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index 8012f85..12fc1bd 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -44,20 +44,16 @@ public class UpdateGraphData { long start = System.currentTimeMillis(); try { - updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", - Fqdn.class,BaseDocument.class, + updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class, ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); - updateDocument(newVertexIpMap,historyVertexIpMap,"IP", - Ip.class,BaseDocument.class, + updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class, ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument); - updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", - Subscriber.class,BaseDocument.class, + updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", Subscriber.class,BaseDocument.class, ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument); - updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP", - LocateFqdn2Ip.class,BaseEdgeDocument.class, + updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class, ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument); // updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN", diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java index b16be3b..86981fd 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java @@ -264,8 +264,8 @@ public class ReadClickhouseData { public static String getVertexIpSql() { String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; - String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; + String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; + String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))"; } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java index 8b51128..5214fc4 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java @@ -50,6 +50,7 @@ public class ReadHistoryArangoData extends Thread { long s = System.currentTimeMillis(); ArangoCursor docs = arangoConnect.executorQuery(query, type); if (docs != null) { + ArrayList list = new ArrayList<>(); List baseDocuments = docs.asListRemaining(); int i = 0; for (T doc : baseDocuments) { @@ -58,9 +59,7 @@ public class ReadHistoryArangoData extends Thread { case "R_LOCATE_FQDN2IP": updateProtocolDocument(doc); deleteDistinctClientIpByTime(doc); - break; - case "R_VISIT_IP2FQDN": - updateProtocolDocument(doc); + list.add(doc); break; default: } @@ -69,6 +68,7 @@ public class ReadHistoryArangoData extends Thread { tmpMap.put(key, doc); i++; } + arangoConnect.overwrite(list,table); long l = System.currentTimeMillis(); LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s)); } @@ -99,10 +99,11 @@ public class ReadHistoryArangoData extends Thread { ArrayList distCipTs = (ArrayList) doc.getAttribute("DIST_CIP_TS"); distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600); Collections.sort(distCipTs); + Collections.reverse(distCipTs); int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600); String[] distCipArr = new String[index]; long[] disCipTsArr = new long[index]; - if (distCip.size() + 1 == distCipTs.size()){ + if (index != 0 && distCip.size() + 1 == distCipTs.size()){ for (int i = 0; i < index; i++) { distCipArr[i] = distCip.get(i); disCipTsArr[i] = distCipTs.get(i); diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index dd055d9..7293dc1 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -17,7 +17,7 @@ thread.await.termination.time=10 #读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围 -time.limit.type=1 +time.limit.type=0 read.clickhouse.max.time=1596684142 read.clickhouse.min.time=1596425769 diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java index d437595..2e5ac36 100644 --- a/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java @@ -11,6 +11,7 @@ import java.util.List; public class TestList { public static void main(String[] args) { + /* ArangoDBConnect arangoConnect = ArangoDBConnect.getInstance(); ArangoCursor documents = arangoConnect.executorQuery("FOR doc IN R_LOCATE_FQDN2IP filter doc.FIRST_FOUND_TIME >= 1596080839 and doc.FIRST_FOUND_TIME <= 1596395473 RETURN doc", BaseEdgeDocument.class); List baseEdgeDocuments = documents.asListRemaining(); @@ -18,8 +19,8 @@ public class TestList { doc.updateAttribute("PROTOCOL_TYPE","123"); } +*/ - /* ArrayList integers = new ArrayList<>(); integers.add(10); integers.add(8); @@ -39,7 +40,9 @@ public class TestList { integers.add(5); Collections.sort(integers); System.out.println(integers); + Collections.reverse(integers); + System.out.println(integers); System.out.println(integers.indexOf(5)); - */ + } }