diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java index d90ee44..d711439 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -3,7 +3,6 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.service.read.ReadHistoryArangoData; import cn.ac.iie.utils.ArangoDBConnect; -import cn.ac.iie.utils.ClickhouseConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.ArangoCursor; import com.arangodb.entity.BaseDocument; @@ -11,8 +10,6 @@ import com.arangodb.entity.BaseEdgeDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Enumeration; import java.util.concurrent.ConcurrentHashMap; /** @@ -37,9 +34,9 @@ public class BaseArangoData { readHistoryData("FQDN", v_Fqdn_Map,BaseDocument.class); readHistoryData("IP", v_Ip_Map,BaseDocument.class); readHistoryData("SUBSCRIBER",v_Subscriber_Map,BaseDocument.class); -// readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map); -// readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map); -// readHistoryData("R_LOCATE_SUBSCRIBER2IP",e_Subsciber_Locate_Ip_Map); + readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map,BaseEdgeDocument.class); + readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map,BaseEdgeDocument.class); + readHistoryData("R_LOCATE_SUBSCRIBER2IP",e_Subsciber_Locate_Ip_Map,BaseEdgeDocument.class); threadPool.shutdown(); threadPool.awaitThreadTask(); LOG.info("v_Fqdn_Map大小:"+v_Fqdn_Map.size()); @@ -52,23 +49,12 @@ public class BaseArangoData { LOG.info("读取ArangoDb时间:"+(lastA - startA)); } - public static void main(String[] args) { - new BaseArangoData().readHistoryData("IP", v_Ip_Map,BaseDocument.class); - threadPool.shutdown(); - threadPool.awaitThreadTask(); - ArrayList baseEdgeDocuments = new ArrayList<>(); - Enumeration keys = v_Ip_Map.keys(); - arangoDBConnect.overwrite(baseEdgeDocuments,"IP"); - arangoDBConnect.clean(); - - } - private void readHistoryData(String table, ConcurrentHashMap map, Class type){ try { long[] timeRange = getTimeRange(table); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { String sql = getQuerySql(timeRange, i, table); - ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type); + ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table); threadPool.executor(readHistoryArangoData); } }catch (Exception e){ diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index 825543b..e3641a0 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -26,29 +26,18 @@ public class UpdateGraphData { private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance(); private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + private static BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); private CountDownLatch countDownLatch; public void updateArango(){ long startC = System.currentTimeMillis(); try { - BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); - baseClickhouseData.baseVertexFqdn(); updateVertexFqdn(); - - baseClickhouseData.baseVertexIp(); updateVertexIp(); - -// baseClickhouseData.baseRelationshipFqdnAddressIp(); -// updateRelationFqdnAddressIp(); - -// baseClickhouseData.baseRelationshipIpVisitFqdn(); -// updateRelationIpVisitFqdn(); - - baseClickhouseData.baseVertexSubscriber(); + updateRelationFqdnAddressIp(); + updateRelationIpVisitFqdn(); updateVertexSubscriber(); - -// baseClickhouseData.baseRelationshipSubscriberLocateIp(); -// updateRelationshipSubsciberLocateIp(); + updateRelationshipSubsciberLocateIp(); }catch (Exception e){ e.printStackTrace(); }finally { @@ -59,6 +48,7 @@ public class UpdateGraphData { } private void updateVertexFqdn(){ + baseClickhouseData.baseVertexFqdn(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { @@ -75,6 +65,7 @@ public class UpdateGraphData { } private void updateVertexSubscriber(){ + baseClickhouseData.baseVertexSubscriber(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { @@ -91,6 +82,7 @@ public class UpdateGraphData { } private void updateRelationshipSubsciberLocateIp(){ + baseClickhouseData.baseRelationshipSubscriberLocateIp(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { @@ -107,6 +99,7 @@ public class UpdateGraphData { } private void updateVertexIp(){ + baseClickhouseData.baseVertexIp(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { @@ -123,6 +116,7 @@ public class UpdateGraphData { } private void updateRelationFqdnAddressIp(){ + baseClickhouseData.baseRelationshipFqdnAddressIp(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { @@ -139,6 +133,7 @@ public class UpdateGraphData { } private void updateRelationIpVisitFqdn(){ + baseClickhouseData.baseRelationshipIpVisitFqdn(); try { countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java index 7c14b63..baef520 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java @@ -10,6 +10,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.regex.Pattern; /** @@ -22,6 +23,16 @@ public class ReadClickhouseData { private static Pattern pattern = Pattern.compile("^[\\d]*$"); private static final Logger LOG = LoggerFactory.getLogger(ReadClickhouseData.class); + + public static HashSet protocolSet; + + static { + protocolSet = new HashSet<>(); + protocolSet.add("HTTP"); + protocolSet.add("TLS"); + protocolSet.add("DNS"); + } + public static BaseDocument getVertexFqdnDocument(ResultSet resultSet) throws SQLException { String fqdnName = resultSet.getString("FQDN"); BaseDocument newDoc = null; @@ -107,6 +118,7 @@ public class ReadClickhouseData { long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); long countTotal = resultSet.getLong("COUNT_TOTAL"); + String schemaType = resultSet.getString("schema_type"); String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); long[] clientIpTs = new long[distCipRecents.length]; for (int i = 0; i < clientIpTs.length; i++) { @@ -120,10 +132,14 @@ public class ReadClickhouseData { newDoc.setTo("IP/" + vIp); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL", countTotal); newDoc.addAttribute("DIST_CIP", distCipRecents); newDoc.addAttribute("DIST_CIP_TS", clientIpTs); + initSchemaProperty(newDoc); + + if (protocolSet.contains(schemaType)){ + checkSchemaProperty(newDoc, schemaType, countTotal); + } } return newDoc; } @@ -137,6 +153,7 @@ public class ReadClickhouseData { long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); long countTotal = resultSet.getLong("COUNT_TOTAL"); + String schemaType = resultSet.getString("schema_type"); newDoc = new BaseEdgeDocument(); newDoc.setKey(key); @@ -144,19 +161,24 @@ public class ReadClickhouseData { newDoc.setTo("FQDN/" + vFqdn); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL", countTotal); + + initSchemaProperty(newDoc); + + if (protocolSet.contains(schemaType)){ + checkSchemaProperty(newDoc, schemaType, countTotal); + } } return newDoc; } - public static void putMapByHashcode(T newDoc, HashMap>> map){ + public static void putMapByHashcode(T newDoc, HashMap>> map) { if (newDoc != null) { String key = newDoc.getKey(); int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; HashMap> documentHashMap = map.getOrDefault(i, new HashMap<>()); ArrayList documentArrayList = documentHashMap.getOrDefault(key, new ArrayList<>()); documentArrayList.add(newDoc); - documentHashMap.put(key,documentArrayList); + documentHashMap.put(key, documentArrayList); } } @@ -183,6 +205,26 @@ public class ReadClickhouseData { return false; } + + private static void checkSchemaProperty(BaseEdgeDocument newDoc, String schema, long countTotal) { + long[] recentCnt = new long[24]; + recentCnt[0] = countTotal; + String protocolRecent = schema +"_CNT_RECENT"; + String protocolTotal = schema + "_CNT_TOTAL"; + newDoc.updateAttribute(protocolTotal, countTotal); + newDoc.updateAttribute(protocolRecent, recentCnt); + newDoc.addAttribute("PROTOCOL_TYPE", schema); + } + + private static void initSchemaProperty(BaseEdgeDocument newDoc){ + newDoc.addAttribute("HTTP_CNT_TOTAL", 0L); + newDoc.addAttribute("HTTP_CNT_RECENT", new long[24]); + newDoc.addAttribute("TLS_CNT_TOTAL", 0L); + newDoc.addAttribute("TLS_CNT_RECENT", new long[24]); + newDoc.addAttribute("DNS_CNT_TOTAL", 0L); + newDoc.addAttribute("DNS_CNT_RECENT", new long[24]); + } + public static String getVertexFqdnSql() { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; @@ -200,7 +242,6 @@ public class ReadClickhouseData { String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; - String frameIpSql = ""; return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))"; } @@ -209,8 +250,8 @@ public class ReadClickhouseData { long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'SSL' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; - String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; + String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; + String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; } @@ -219,8 +260,8 @@ public class ReadClickhouseData { long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip"; - String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'SSL' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip"; + String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip"; + String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip"; return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index 138bae3..e0aec45 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -3,12 +3,10 @@ package cn.ac.iie.service.read; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.ArangoCursor; import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -19,32 +17,58 @@ import java.util.concurrent.ConcurrentHashMap; public class ReadHistoryArangoData extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class); - private ArangoDBConnect arangoDBConnect; + private ArangoDBConnect arangoConnect; private String query; private ConcurrentHashMap map; private Class type; + private String table; - public ReadHistoryArangoData(ArangoDBConnect arangoDBConnect, String query, ConcurrentHashMap map,Class type) { - this.arangoDBConnect = arangoDBConnect; + public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, ConcurrentHashMap map, Class type, String table) { + this.arangoConnect = arangoConnect; this.query = query; this.map = map; this.type = type; + this.table = table; } @Override public void run() { long s = System.currentTimeMillis(); - ArangoCursor docs = arangoDBConnect.executorQuery(query, type); - if (docs != null){ + ArangoCursor docs = arangoConnect.executorQuery(query, type); + if (docs != null) { List baseDocuments = docs.asListRemaining(); int i = 0; for (T doc : baseDocuments) { String key = doc.getKey(); + switch (table) { + case "R_LOCATE_FQDN2IP": + updateProtocolDocument(doc); + break; + case "R_VISIT_IP2FQDN": + updateProtocolDocument(doc); + break; + default: + } map.put(key, doc); i++; } long l = System.currentTimeMillis(); - LOG.info(query+ "\n处理数据" + i + "条,运行时间:" + (l - s)); + LOG.info(query + "\n处理数据" + i + "条,运行时间:" + (l - s)); } } + + private void updateProtocolDocument(T doc) { + if (doc.getProperties().containsKey("PROTOCOL_TYPE")) { + for (String protocol : ReadClickhouseData.protocolSet) { + String protocolRecent = protocol + "_CNT_RECENT"; + ArrayList cntRecent = (ArrayList) doc.getAttribute(protocolRecent); + Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); + Long[] cntRecentsDst = new Long[24]; + System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); + cntRecentsDst[0] = 0L; + doc.addAttribute(protocolRecent, cntRecentsDst); + } + } + } + } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java index f9e3b88..0515381 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java @@ -22,11 +22,15 @@ public class LocateFqdn2Ip extends Relationship { @Override protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc){ super.mergeFunction(properties,schemaEdgeDoc); + mergeProtocol(properties, schemaEdgeDoc); } @Override protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { super.updateFunction(newEdgeDocument, historyEdgeDocument); + updateProcotol(historyEdgeDocument,"TLS",newEdgeDocument); + updateProcotol(historyEdgeDocument,"HTTP",newEdgeDocument); + updateProcotol(historyEdgeDocument,"DNS",newEdgeDocument); updateDistinctClientIp(newEdgeDocument, historyEdgeDocument); } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java index c9b63db..d5e60b9 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java @@ -6,7 +6,6 @@ import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -19,14 +18,4 @@ public class LocateSubscriber2Ip extends Relationship { CountDownLatch countDownLatch) { super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); } - - @Override - protected BaseEdgeDocument mergeRelationship(ArrayList newEdgeDocumentSchemaMap) { - return super.mergeRelationship(newEdgeDocumentSchemaMap); - } - - @Override - protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { - super.updateFoundTime(newEdgeDocument,historyEdgeDocument); - } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java index f5b5e3d..f6df715 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java @@ -6,6 +6,7 @@ import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -17,4 +18,18 @@ public class VisitIp2Fqdn extends Relationship { CountDownLatch countDownLatch) { super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); } + + @Override + protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { + super.updateFunction(newEdgeDocument, historyEdgeDocument); + updateProcotol(historyEdgeDocument,"TLS",newEdgeDocument); + updateProcotol(historyEdgeDocument,"HTTP",newEdgeDocument); + updateProcotol(historyEdgeDocument,"DNS",newEdgeDocument); + } + + @Override + protected void mergeFunction(Map newProperties, BaseEdgeDocument lastDoc) { + super.mergeFunction(newProperties, lastDoc); + mergeProtocol(newProperties, lastDoc); + } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java index ceaf7fa..1c9203d 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java @@ -89,29 +89,29 @@ public class Document extends Thread{ }else if (newDocumentSchemaList.size() == 1){ return newDocumentSchemaList.get(0); }else { - T document = type.newInstance(); - Map properties = document.getProperties(); + T newDocument = type.newInstance(); + Map newProperties = newDocument.getProperties(); for (T doc:newDocumentSchemaList){ - if (properties.isEmpty()){ - document = doc; - properties = doc.getProperties(); + if (newProperties.isEmpty()){ + newDocument = doc; + newProperties = doc.getProperties(); }else { - mergeFunction(properties,doc); + mergeFunction(newProperties,doc); } } - document.setProperties(properties); - return document; + newDocument.setProperties(newProperties); + return newDocument; } } - protected void mergeFunction(Map properties, T doc) { - long firstFoundTime = Long.parseLong(properties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); - long docFirstFoundTime = Long.parseLong(doc.getAttribute("FIRST_FOUND_TIME").toString()); - properties.put("FIRST_FOUND_TIME",firstFoundTime newProperties, T lastDoc) { + long firstFoundTime = Long.parseLong(newProperties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); + long docFirstFoundTime = Long.parseLong(lastDoc.getAttribute("FIRST_FOUND_TIME").toString()); + newProperties.put("FIRST_FOUND_TIME",firstFoundTimedocLastFoundTime? lastFoundTime:docLastFoundTime); + long lastFoundTime = Long.parseLong(newProperties.getOrDefault("LAST_FOUND_TIME", 0L).toString()); + long docLastFoundTime = Long.parseLong(lastDoc.getAttribute("LAST_FOUND_TIME").toString()); + newProperties.put("LAST_FOUND_TIME",lastFoundTime>docLastFoundTime? lastFoundTime:docLastFoundTime); } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java index d1172dd..29e6ec2 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java @@ -1,158 +1,78 @@ package cn.ac.iie.service.update; -import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.service.read.ReadClickhouseData; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -public class Relationship extends Thread { - private static final Logger LOG = LoggerFactory.getLogger(Relationship.class); - - private HashMap> newDocumentHashMap; - private ArangoDBConnect arangoManger; - private String collectionName; - private ConcurrentHashMap historyDocumentMap; - private CountDownLatch countDownLatch; +public class Relationship extends Document { public Relationship(HashMap> newDocumentHashMap, ArangoDBConnect arangoManger, String collectionName, ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - this.newDocumentHashMap = newDocumentHashMap; - this.arangoManger = arangoManger; - this.collectionName = collectionName; - this.historyDocumentMap = historyDocumentMap; - this.countDownLatch = countDownLatch; + super(newDocumentHashMap,arangoManger,collectionName,historyDocumentMap,countDownLatch,BaseEdgeDocument.class); } @Override - public void run() { - Set keySet = newDocumentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - int i = 0; - try { - for (String key : keySet) { - ArrayList newEdgeDocumentSchemaList = newDocumentHashMap.getOrDefault(key, null); - if (newEdgeDocumentSchemaList != null) { - BaseEdgeDocument newEdgeDocument = mergeRelationship(newEdgeDocumentSchemaList); - i += 1; - BaseEdgeDocument historyEdgeDocument = historyDocumentMap.getOrDefault(key, null); - updateRelationship(newEdgeDocument,historyEdgeDocument,docInsert); - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { - arangoManger.overwrite(docInsert, collectionName); - LOG.info("更新"+collectionName+":" + i); - i = 0; - } - } - } - if (i != 0) { - arangoManger.overwrite(docInsert, collectionName); - LOG.info("更新"+collectionName+":" + i); - } - } catch (Exception e) { - e.printStackTrace(); - LOG.error(e.toString()); - }finally { - countDownLatch.countDown(); - } + protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument){ + super.updateFunction(newEdgeDocument,historyEdgeDocument); } - protected BaseEdgeDocument mergeRelationship(ArrayList newEdgeDocumentSchemaMap) { - return new BaseEdgeDocument(); - } - - private void updateRelationship(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument,ArrayList docInsert){ - if (historyEdgeDocument != null && newEdgeDocument != null) { - updateFunction(newEdgeDocument, historyEdgeDocument); - docInsert.add(historyEdgeDocument); - } else { - docInsert.add(newEdgeDocument); - } - } - - protected void updateFunction(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument){ - updateFoundTime(newEdgeDocument,historyEdgeDocument); - setSchemaCntByHistory(historyEdgeDocument,"TLS_CNT_RECENT","TLS_CNT_TOTAL",newEdgeDocument); - setSchemaCntByHistory(historyEdgeDocument,"HTTP_CNT_RECENT","HTTP_CNT_TOTAL",newEdgeDocument); - setSchemaCntByHistory(historyEdgeDocument,"DNS_CNT_RECENT","DNS_CNT_TOTAL",newEdgeDocument); - } - - protected void updateFoundTime(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument){ - Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); - historyEdgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); - } - - private void setSchemaCntByHistory(BaseEdgeDocument historyEdgeDocument,String schema,String totalSchema,BaseEdgeDocument newEdgeDocument){ + protected void updateProcotol(BaseEdgeDocument historyEdgeDocument, String schema, BaseEdgeDocument newEdgeDocument){ + String recentSchema = schema +"_CNT_RECENT"; + String totalSchema = schema + "_CNT_TOTAL"; long countTotal = Long.parseLong(newEdgeDocument.getAttribute(totalSchema).toString()); - long updateCountTotal = Long.parseLong(historyEdgeDocument.getAttribute(totalSchema).toString()); + if (countTotal > 0L){ + long updateCountTotal = Long.parseLong(historyEdgeDocument.getAttribute(totalSchema).toString()); - ArrayList cntRecent = (ArrayList) historyEdgeDocument.getAttribute(schema); - Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); - Long[] cntRecentsDst = new Long[24]; - System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); - cntRecentsDst[0] = countTotal; + Long[] cntRecent = (Long[]) historyEdgeDocument.getAttribute(recentSchema); + cntRecent[0] = countTotal; - historyEdgeDocument.addAttribute(schema, cntRecentsDst); - historyEdgeDocument.addAttribute(totalSchema, countTotal + updateCountTotal); - } - - protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc) { - mergeFoundTime(properties, schemaEdgeDoc); - } - - private void mergeFoundTime(Map properties, BaseEdgeDocument schemaEdgeDoc) { - long schemaFirstFoundTime = Long.parseLong(schemaEdgeDoc.getAttribute("FIRST_FOUND_TIME").toString()); - long firstFoundTime = Long.parseLong(properties.get("FIRST_FOUND_TIME").toString()); - properties.put("FIRST_FOUND_TIME", schemaFirstFoundTime < firstFoundTime ? schemaFirstFoundTime : firstFoundTime); - long schemaLastFoundTime = Long.parseLong(schemaEdgeDoc.getAttribute("LAST_FOUND_TIME").toString()); - long lastFoundTime = Long.parseLong(properties.get("LAST_FOUND_TIME").toString()); - properties.put("LAST_FOUND_TIME", schemaLastFoundTime > lastFoundTime ? schemaLastFoundTime : lastFoundTime); - } - - private void setSchemaCount(String schema, BaseEdgeDocument schemaEdgeDoc, Map properties) { - switch (schema) { - case "HTTP": - long httpCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); - properties.put("HTTP_CNT_TOTAL", httpCntTotal); - long[] httpCntRecentsDst = new long[24]; - httpCntRecentsDst[0] = httpCntTotal; - properties.put("HTTP_CNT_RECENT", httpCntRecentsDst); - break; - case "SSL": - long tlsCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); - properties.put("TLS_CNT_TOTAL", tlsCntTotal); - long[] tlsCntRecentsDst = new long[24]; - tlsCntRecentsDst[0] = tlsCntTotal; - properties.put("TLS_CNT_RECENT", tlsCntRecentsDst); - break; - default: - break; + historyEdgeDocument.addAttribute(recentSchema, cntRecent); + historyEdgeDocument.addAttribute(totalSchema, countTotal + updateCountTotal); + String hisProtocolType = historyEdgeDocument.getAttribute("PROTOCOL_TYPE").toString(); + if (!hisProtocolType.contains(schema)){ + hisProtocolType = hisProtocolType + "," + schema; + historyEdgeDocument.addAttribute("PROTOCOL_TYPE",hisProtocolType); + } } } - private void checkSchemaProperty(Map properties){ - if (!properties.containsKey("TLS_CNT_TOTAL")){ - properties.put("TLS_CNT_TOTAL",0L); - properties.put("TLS_CNT_RECENT",new long[24]); - } - if (!properties.containsKey("HTTP_CNT_TOTAL")){ - properties.put("HTTP_CNT_TOTAL",0L); - properties.put("HTTP_CNT_RECENT",new long[24]); - } - if (!properties.containsKey("DNS_CNT_TOTAL")){ - properties.put("DNS_CNT_TOTAL",0L); - properties.put("DNS_CNT_RECENT",new long[24]); + @Override + protected void mergeFunction(Map newProperties, BaseEdgeDocument lastDoc) { + super.mergeFunction(newProperties, lastDoc); + } + + protected void mergeProtocol(Map newProperties, BaseEdgeDocument lastDoc) { + String schema = lastDoc.getAttribute("PROTOCOL_TYPE").toString(); + if (ReadClickhouseData.protocolSet.contains(schema)){ + setProtocolProperties(schema,newProperties,lastDoc); } } + private void setProtocolProperties(String protocol,Map newProperties, BaseEdgeDocument lastDoc){ + String protocolRecent = protocol +"_CNT_RECENT"; + String protocolTotal = protocol + "_CNT_TOTAL"; + long httpCntTotal = Long.parseLong(lastDoc.getAttribute(protocolTotal).toString()); + newProperties.put(protocolTotal, httpCntTotal); + long[] httpCntRecents = (long[]) lastDoc.getAttribute(protocolRecent); + newProperties.put(protocolRecent, httpCntRecents); + String protocolType = newProperties.get("PROTOCOL_TYPE").toString(); + newProperties.put("PROTOCOL_TYPE",addProcotolType(protocolType,protocol)); + } + + private String addProcotolType(String protocolType,String schema){ + if (!protocolType.contains(schema)){ + protocolType = protocolType + "," + schema; + } + return protocolType; + } } diff --git a/ip-learning-java-test/src/main/resources/application.properties b/ip-learning-java-test/src/main/resources/application.properties index 96c5b3b..61aca8d 100644 --- a/ip-learning-java-test/src/main/resources/application.properties +++ b/ip-learning-java-test/src/main/resources/application.properties @@ -9,7 +9,7 @@ arangoDB.ttl=3600 update.arango.batch=10000 -thread.pool.number=5 +thread.pool.number=10 thread.await.termination.time=10 read.clickhouse.max.time=1594809098 diff --git a/ip-learning-java-test/src/test/java/cn/ac/iie/TestArango.java b/ip-learning-java-test/src/test/java/cn/ac/iie/TestArango.java index be52053..f43eece 100644 --- a/ip-learning-java-test/src/test/java/cn/ac/iie/TestArango.java +++ b/ip-learning-java-test/src/test/java/cn/ac/iie/TestArango.java @@ -5,17 +5,33 @@ import com.arangodb.ArangoCursor; import com.arangodb.ArangoDatabase; import com.arangodb.entity.BaseEdgeDocument; +import java.util.ArrayList; import java.util.List; public class TestArango { public static void main(String[] args) { ArangoDBConnect instance = ArangoDBConnect.getInstance(); + /* String query = "FOR doc IN IP filter doc.FIRST_FOUND_TIME >= 1592996080 and doc.FIRST_FOUND_TIME <= 1593112913 RETURN doc"; ArangoCursor baseEdgeDocuments = instance.executorQuery(query, BaseEdgeDocument.class); while (baseEdgeDocuments.hasNext()){ BaseEdgeDocument next = baseEdgeDocuments.next(); System.out.println(next.toString()); } -// ArangoDBConnect.clean(); + */ + BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); + baseEdgeDocument.setKey("192.168.50.6-www.liftopia.com"); + baseEdgeDocument.setFrom("IP/192.168.50.6"); + baseEdgeDocument.setTo("FQDN/www.liftopia.com"); + baseEdgeDocument.addAttribute("HTTP_CNT_TOTAL",3L); + baseEdgeDocument.addAttribute("DNS_CNT_RECENT",new long[24]); + baseEdgeDocument.addAttribute("PROTOCOL_TYPE","HTTP"); + + ArrayList baseEdgeDocuments = new ArrayList<>(); + + baseEdgeDocuments.add(baseEdgeDocument); + instance.overwrite(baseEdgeDocuments,"R_LOCATE_FQDN2IP"); + + instance.clean(); } }