diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java index c431d9b..fc7bf83 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -24,6 +24,7 @@ public class BaseArangoData { public static ConcurrentHashMap historyVertexSubscriberMap = new ConcurrentHashMap<>(); public static ConcurrentHashMap historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>(); public static ConcurrentHashMap historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>(); + public static ConcurrentHashMap historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>(); public static ConcurrentHashMap historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>(); private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index 2fdc967..49fde14 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -29,6 +29,7 @@ public class BaseClickhouseData { static HashMap>> newRelationFqdnAddressIpMap = new HashMap<>(); static HashMap>> newRelationIpVisitFqdnMap = new HashMap<>(); static HashMap>> newRelationSubsciberLocateIpMap = new HashMap<>(); + static HashMap>> newRelationFqdnSameFqdnMap = new HashMap<>(); private DruidPooledConnection connection; private Statement statement; @@ -148,6 +149,28 @@ public class BaseClickhouseData { } } + void baseRelationshipFqdnSameFqdn(){ + initializeMap(newRelationFqdnSameFqdnMap); + LOG.info("R_SAME_ORIGIN_FQDN2FQDN resultMap初始化完成"); + String sql = getRelationshipFqdnSameFqdnSql(); + long start = System.currentTimeMillis(); + try { + connection = manger.getConnection(); + statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()) { + BaseEdgeDocument newDoc = getRelationshipFqdnSameFqdnDocument(resultSet); + putMapByHashcode(newDoc, newRelationFqdnSameFqdnMap); + } + long last = System.currentTimeMillis(); + LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); + }catch (Exception e){ + e.printStackTrace(); + }finally { + manger.clear(statement,connection); + } + } + void baseRelationshipIpVisitFqdn() { initializeMap(newRelationIpVisitFqdnMap); LOG.info("R_VISIT_IP2FQDN resultMap初始化完成"); diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index b888411..da4c1a2 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -45,8 +45,8 @@ public class UpdateGraphData { baseArangoData.readHistoryData("IP", historyVertexIpMap,BaseDocument.class); updateVertexIp(); - baseArangoData.readHistoryData("SUBSCRIBER", historyVertexSubscriberMap,BaseDocument.class); - updateVertexSubscriber(); +// baseArangoData.readHistoryData("SUBSCRIBER", historyVertexSubscriberMap,BaseDocument.class); +// updateVertexSubscriber(); baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap,BaseEdgeDocument.class); updateRelationFqdnAddressIp(); @@ -54,8 +54,11 @@ public class UpdateGraphData { baseArangoData.readHistoryData("R_VISIT_IP2FQDN", historyRelationIpVisitFqdnMap,BaseEdgeDocument.class); updateRelationIpVisitFqdn(); - baseArangoData.readHistoryData("R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap,BaseEdgeDocument.class); - updateRelationshipSubsciberLocateIp(); + baseArangoData.readHistoryData("R_SAME_ORIGIN_FQDN2FQDN",historyRelationFqdnSameFqdnMap,BaseEdgeDocument.class); + updateRelationFqdnSameFqdn(); + +// baseArangoData.readHistoryData("R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap,BaseEdgeDocument.class); +// updateRelationshipSubsciberLocateIp(); long last = System.currentTimeMillis(); LOG.info("更新图数据库时间共计:"+(last - start)); @@ -193,4 +196,25 @@ public class UpdateGraphData { } } + private void updateRelationFqdnSameFqdn(){ + try { + long start = System.currentTimeMillis(); + baseClickhouseData.baseRelationshipFqdnSameFqdn(); + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> tmpMap = newRelationFqdnSameFqdnMap.get(i); + VisitIp2Fqdn ipVisitFqdn = new VisitIp2Fqdn(tmpMap,arangoManger,"R_SAME_ORIGIN_FQDN2FQDN", historyRelationFqdnSameFqdnMap,countDownLatch); + pool.executor(ipVisitFqdn); + } + countDownLatch.await(); + long last = System.currentTimeMillis(); + LOG.info("R_SAME_ORIGIN_FQDN2FQDN ralationship 更新完毕,共耗时:"+(last-start)); + }catch (Exception e){ + e.printStackTrace(); + }finally { + historyRelationFqdnSameFqdnMap.clear(); + newRelationFqdnSameFqdnMap.clear(); + } + } + } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java deleted file mode 100644 index bf36728..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java +++ /dev/null @@ -1,58 +0,0 @@ -package cn.ac.iie.service; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Set; - -public class UpdateEFqdnAddressIp implements Runnable { - private HashMap documentHashMap; - - private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateEFqdnAddressIp(HashMap documentHashMap) { - this.documentHashMap = documentHashMap; - } - @Override - public void run() { - Set keySet = documentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - try { - for (String key:keySet){ - BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); - if (newEdgeDocument != null){ - i += 1; - BaseEdgeDocument edgeDocument = BaseArangoData.historyRelationFqdnAddressIpMap.getOrDefault(key, null); - if (edgeDocument != null){ - Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); - long countTotal = Long.parseLong(newEdgeDocument.getAttribute("COUNT_TOTAL").toString()); - long updateCountTotal = Long.parseLong(edgeDocument.getAttribute("COUNT_TOTAL").toString()); - edgeDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); - edgeDocument.addAttribute("COUNT_TOTAL",countTotal+updateCountTotal); - docInsert.add(edgeDocument); - }else { - docUpdate.add(newEdgeDocument); - } - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"E_ADDRESS_V_FQDN_TO_V_IP"); - System.out.println("更新"+i); - i = 0; - } - } - if (i != 0){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"E_ADDRESS_V_FQDN_TO_V_IP"); - System.out.println("更新"+i); - } - }catch (Exception e){ - e.printStackTrace(); - } - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java deleted file mode 100644 index 092e794..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java +++ /dev/null @@ -1,58 +0,0 @@ -package cn.ac.iie.service; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Set; - -public class UpdateEIpVisitFqdn implements Runnable { - private HashMap documentHashMap; - - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateEIpVisitFqdn(HashMap documentHashMap) { - this.documentHashMap = documentHashMap; - } - @Override - public void run() { - Set keySet = documentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - try { - for (String key:keySet){ - BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); - if (newEdgeDocument != null){ - i += 1; - BaseEdgeDocument edgeDocument = BaseArangoData.historyRelationIpVisitFqdnMap.getOrDefault(key, null); - if (edgeDocument != null){ - Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); - long countTotal = Long.parseLong(newEdgeDocument.getAttribute("COUNT_TOTAL").toString()); - long updateCountTotal = Long.parseLong(edgeDocument.getAttribute("COUNT_TOTAL").toString()); - edgeDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); - edgeDocument.addAttribute("COUNT_TOTAL",countTotal+updateCountTotal); - docInsert.add(edgeDocument); - }else { - docUpdate.add(newEdgeDocument); - } - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"E_VISIT_V_IP_TO_V_FQDN"); - System.out.println("更新"+i); - i = 0; - } - } - if (i != 0){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"E_VISIT_V_IP_TO_V_FQDN"); - System.out.println("更新"+i); - } - }catch (Exception e){ - e.printStackTrace(); - } - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java deleted file mode 100644 index f14b69a..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java +++ /dev/null @@ -1,94 +0,0 @@ -package cn.ac.iie.service; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -public class UpdateVFqdn implements Runnable{ - - private HashMap> documentHashMap; - - private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateVFqdn(HashMap> documentHashMap) { - this.documentHashMap = documentHashMap; - } - - @Override - public void run() { - Set keySet = documentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - try { - for (String key:keySet){ - ArrayList documentArrayList = documentHashMap.getOrDefault(key, null); - BaseDocument newDocument = mergeVFqdn(documentArrayList); - - if (newDocument != null){ - i += 1; - BaseDocument document = BaseArangoData.historyVertexFqdnMap.getOrDefault(key, null); - if (document != null){ - Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); - long fqdnCountTotal = Long.parseLong(newDocument.getAttribute("FQDN_COUNT_TOTAL").toString()); - long countTotal = Long.parseLong(document.getAttribute("FQDN_COUNT_TOTAL").toString()); - document.addAttribute("LAST_FOUND_TIME",lastFoundTime); - document.addAttribute("FQDN_COUNT_TOTAL",countTotal+fqdnCountTotal); - docUpdate.add(document); - }else { - docInsert.add(newDocument); - } - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"V_FQDN"); - System.out.println("更新"+i); - i = 0; - } - } - if (i != 0){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"V_FQDN"); - System.out.println("更新"+i); - } - }catch (Exception e){ - e.printStackTrace(); - } - - } - - private BaseDocument mergeVFqdn(ArrayList documentArrayList){ - if (documentArrayList == null || documentArrayList.isEmpty()){ - return null; - }else if (documentArrayList.size() == 1){ - return documentArrayList.get(0); - }else { - BaseDocument document = new BaseDocument(); - Map properties = document.getProperties(); - for (BaseDocument doc:documentArrayList){ - if (properties.isEmpty()){ - document = doc; - properties = doc.getProperties(); - }else { - long firstFoundTime = Long.parseLong(properties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); - long docFirstFoundTime = Long.parseLong(doc.getAttribute("FIRST_FOUND_TIME").toString()); - properties.put("FIRST_FOUND_TIME",firstFoundTimedocLastFoundTime? lastFoundTime:docLastFoundTime); - - long fqdnCountTotal = Long.parseLong(properties.getOrDefault("FQDN_COUNT_TOTAL", 0L).toString()); - long docFqdnCountTotal = Long.parseLong(doc.getAttribute("FQDN_COUNT_TOTAL").toString()); - properties.put("FQDN_COUNT_TOTAL",fqdnCountTotal+docFqdnCountTotal); - } - } - document.setProperties(properties); - return document; - } - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java deleted file mode 100644 index 3b83769..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java +++ /dev/null @@ -1,60 +0,0 @@ -package cn.ac.iie.service; - - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Set; - -public class UpdateVIP implements Runnable { - - private HashMap documentHashMap; - - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateVIP(HashMap documentHashMap) { - this.documentHashMap = documentHashMap; - } - - @Override - public void run() { - Set keySet = documentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - try { - for (String key:keySet){ - BaseDocument newDocument = documentHashMap.getOrDefault(key, null); - if (newDocument != null){ - i += 1; - BaseDocument document = BaseArangoData.historyVertexIpMap.getOrDefault(key, null); - if (document != null){ - Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); - long ipCountTotal = Long.parseLong(newDocument.getAttribute("IP_COUNT_TOTAL").toString()); - long countTotal = Long.parseLong(document.getAttribute("IP_COUNT_TOTAL").toString()); - document.addAttribute("LAST_FOUND_TIME",lastFoundTime); - document.addAttribute("IP_COUNT_TOTAL",countTotal+ipCountTotal); - docUpdate.add(document); - }else { - docInsert.add(newDocument); - } - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"V_IP"); - System.out.println("更新"+i); - i = 0; - } - } - if (i != 0){ - arangoManger.insertAndUpdate(docInsert,docUpdate,"V_IP"); - System.out.println("更新"+i); - } - }catch (Exception e){ - e.printStackTrace(); - } - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java index baef520..fdc8f86 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java @@ -1,6 +1,7 @@ package cn.ac.iie.service.read; import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.utils.TopDomainUtils; import com.arangodb.entity.BaseDocument; import com.arangodb.entity.BaseEdgeDocument; import org.slf4j.Logger; @@ -24,17 +25,20 @@ public class ReadClickhouseData { private static final Logger LOG = LoggerFactory.getLogger(ReadClickhouseData.class); - public static HashSet protocolSet; + public static final Integer DISTINCT_CLIENT_IP_NUM = 100; + public static final Integer RECENT_COUNT_HOUR = 24; + public static final HashSet PROTOCOL_SET; static { - protocolSet = new HashSet<>(); - protocolSet.add("HTTP"); - protocolSet.add("TLS"); - protocolSet.add("DNS"); + PROTOCOL_SET = new HashSet<>(); + PROTOCOL_SET.add("HTTP"); + PROTOCOL_SET.add("TLS"); + PROTOCOL_SET.add("DNS"); } public static BaseDocument getVertexFqdnDocument(ResultSet resultSet) throws SQLException { - String fqdnName = resultSet.getString("FQDN"); + String fqdnOrReferer = resultSet.getString("FQDN"); + String fqdnName = TopDomainUtils.getDomainFromUrl(fqdnOrReferer); BaseDocument newDoc = null; if (isDomain(fqdnName)) { long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); @@ -74,8 +78,13 @@ public class ReadClickhouseData { newDoc.addAttribute("CLIENT_BYTES_SUM", 0L); break; default: + newDoc.addAttribute("SERVER_SESSION_COUNT", 0L); + newDoc.addAttribute("SERVER_BYTES_SUM", 0L); + newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L); + newDoc.addAttribute("CLIENT_BYTES_SUM", 0L); + break; } - newDoc.addAttribute("COMMON_LINK_INFO", ""); +// newDoc.addAttribute("COMMON_LINK_INFO", ""); return newDoc; } @@ -118,7 +127,6 @@ public class ReadClickhouseData { long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); long countTotal = resultSet.getLong("COUNT_TOTAL"); - String schemaType = resultSet.getString("schema_type"); String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); long[] clientIpTs = new long[distCipRecents.length]; for (int i = 0; i < clientIpTs.length; i++) { @@ -132,14 +140,31 @@ public class ReadClickhouseData { newDoc.setTo("IP/" + vIp); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("CNT_TOTAL",countTotal); newDoc.addAttribute("DIST_CIP", distCipRecents); newDoc.addAttribute("DIST_CIP_TS", clientIpTs); - initSchemaProperty(newDoc); + } + return newDoc; + } - if (protocolSet.contains(schemaType)){ - checkSchemaProperty(newDoc, schemaType, countTotal); - } + public static BaseEdgeDocument getRelationshipFqdnSameFqdnDocument(ResultSet resultSet) throws SQLException { + BaseEdgeDocument newDoc = null; + String domainFqdn = resultSet.getString("domainFqdn"); + String referer = resultSet.getString("referer"); + String refererFqdn = TopDomainUtils.getDomainFromUrl(referer); + if (isDomain(refererFqdn) && isDomain(domainFqdn)){ + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String key = domainFqdn + "-" + refererFqdn; + newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("FQDN/" + domainFqdn); + newDoc.setTo("FQDN/" + refererFqdn); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("CNT_TOTAL",countTotal); } return newDoc; } @@ -153,20 +178,14 @@ public class ReadClickhouseData { long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); long countTotal = resultSet.getLong("COUNT_TOTAL"); - String schemaType = resultSet.getString("schema_type"); newDoc = new BaseEdgeDocument(); newDoc.setKey(key); newDoc.setFrom("IP/" + vIp); newDoc.setTo("FQDN/" + vFqdn); + newDoc.addAttribute("CNT_TOTAL",countTotal); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - - initSchemaProperty(newDoc); - - if (protocolSet.contains(schemaType)){ - checkSchemaProperty(newDoc, schemaType, countTotal); - } } return newDoc; } @@ -184,6 +203,9 @@ public class ReadClickhouseData { private static boolean isDomain(String fqdn) { try { + if (fqdn == null || fqdn.length() == 0){ + return false; + } String[] fqdnArr = fqdn.split("\\."); if (fqdnArr.length < 4 || fqdnArr.length > 4) { return true; @@ -191,7 +213,7 @@ public class ReadClickhouseData { for (String f : fqdnArr) { if (pattern.matcher(f).matches()) { - int i = Integer.parseInt(f); + long i = Long.parseLong(f); if (i < 0 || i > 255) { return true; } @@ -207,41 +229,38 @@ public class ReadClickhouseData { private static void checkSchemaProperty(BaseEdgeDocument newDoc, String schema, long countTotal) { - long[] recentCnt = new long[24]; + long[] recentCnt = new long[RECENT_COUNT_HOUR]; recentCnt[0] = countTotal; - String protocolRecent = schema +"_CNT_RECENT"; - String protocolTotal = schema + "_CNT_TOTAL"; - newDoc.updateAttribute(protocolTotal, countTotal); - newDoc.updateAttribute(protocolRecent, recentCnt); - newDoc.addAttribute("PROTOCOL_TYPE", schema); - } - - private static void initSchemaProperty(BaseEdgeDocument newDoc){ - newDoc.addAttribute("HTTP_CNT_TOTAL", 0L); - newDoc.addAttribute("HTTP_CNT_RECENT", new long[24]); - newDoc.addAttribute("TLS_CNT_TOTAL", 0L); - newDoc.addAttribute("TLS_CNT_RECENT", new long[24]); - newDoc.addAttribute("DNS_CNT_TOTAL", 0L); - newDoc.addAttribute("DNS_CNT_RECENT", new long[24]); + for (String protocol: PROTOCOL_SET){ + String protocolRecent = protocol +"_CNT_RECENT"; + String protocolTotal = protocol + "_CNT_TOTAL"; + if (protocol.equals(schema)){ + newDoc.addAttribute(protocolTotal, countTotal); + newDoc.addAttribute(protocolRecent, recentCnt); + }else { + newDoc.addAttribute(protocolTotal, 0L); + newDoc.addAttribute(protocolRecent, new long[RECENT_COUNT_HOUR]); + } + } } public static String getVertexFqdnSql() { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; - String where = "common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni"; - String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host"; - return "SELECT FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME FROM ((" + sslSql + ") UNION ALL (" + httpSql + ")) GROUP BY FQDN HAVING FQDN != ''"; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime; + String mediaDomainSql = "SELECT s1_domain AS FQDN,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME FROM media_expire_patch WHERE "+where+" and s1_domain != '' GROUP BY s1_domain"; + String refererSql = "SELECT s1_referer AS FQDN,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME FROM media_expire_patch WHERE "+where+" and s1_referer != '' GROUP BY s1_referer"; + return "SELECT * FROM((" + mediaDomainSql + ") UNION ALL (" + refererSql + "))"; } public static String getVertexIpSql() { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; - String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; - String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; + String where = " recv_time >= " + minTime + " AND recv_time < " + maxTime; + String clientIpSql = "SELECT s1_s_ip AS IP, MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(media_len) as BYTES_SUM,'client' as ip_type FROM media_expire_patch where " + where + " group by IP"; + String serverIpSql = "SELECT s1_d_ip AS IP, MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(media_len) as BYTES_SUM,'server' as ip_type FROM media_expire_patch where " + where + " group by IP"; return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))"; } @@ -249,20 +268,24 @@ public class ReadClickhouseData { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; - String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; - String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; - return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND s1_domain != '' AND s1_d_ip != '' "; + return "SELECT s1_domain AS FQDN,s1_d_ip AS common_server_ip,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(s1_s_ip) AS DIST_CIP_RECENT FROM media_expire_patch WHERE "+where+" GROUP BY s1_d_ip,s1_domain"; + } + + public static String getRelationshipFqdnSameFqdnSql(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND s1_domain != '' AND s1_referer != '' "; + return "SELECT s1_domain AS domainFqdn,s1_referer AS referer,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL FROM media_expire_patch where "+where+" GROUP BY s1_domain,s1_referer"; } public static String getRelationshipIpVisitFqdnSql() { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; - String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip"; - String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip"; - return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" AND s1_s_ip != '' AND s1_domain != '' "; + return "SELECT s1_s_ip AS common_client_ip,s1_domain AS FQDN,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_s_ip,s1_domain"; } public static String getVertexSubscriberSql() { diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index b5f4619..93a0e4d 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -44,15 +44,6 @@ public class ReadHistoryArangoData extends Thread { int i = 0; for (T doc : baseDocuments) { String key = doc.getKey(); - switch (table) { - case "R_LOCATE_FQDN2IP": - updateProtocolDocument(doc); - break; - case "R_VISIT_IP2FQDN": - updateProtocolDocument(doc); - break; - default: - } map.put(key, doc); i++; } @@ -68,7 +59,7 @@ public class ReadHistoryArangoData extends Thread { private void updateProtocolDocument(T doc) { if (doc.getProperties().containsKey("PROTOCOL_TYPE")) { - for (String protocol : ReadClickhouseData.protocolSet) { + for (String protocol : ReadClickhouseData.PROTOCOL_SET) { String protocolRecent = protocol + "_CNT_RECENT"; ArrayList cntRecent = (ArrayList) doc.getAttribute(protocolRecent); Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java index 928ed87..834b1ff 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java @@ -90,29 +90,40 @@ public class Document extends Thread{ }else if (newDocumentSchemaList.size() == 1){ return newDocumentSchemaList.get(0); }else { - T newDocument = type.newInstance(); - Map newProperties = newDocument.getProperties(); - for (T doc:newDocumentSchemaList){ - if (newProperties.isEmpty()){ - newDocument = doc; - newProperties = doc.getProperties(); +// T newDocument = type.newInstance(); + T newDocument = null; + for (T lastDoc:newDocumentSchemaList){ + if (newDocument == null){ + newDocument = lastDoc; }else { - mergeFunction(newProperties,doc); + mergeFunction(lastDoc,newDocument); } } - newDocument.setProperties(newProperties); return newDocument; } } - protected void mergeFunction(Map newProperties, T lastDoc) { - long firstFoundTime = Long.parseLong(newProperties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); - long docFirstFoundTime = Long.parseLong(lastDoc.getAttribute("FIRST_FOUND_TIME").toString()); - newProperties.put("FIRST_FOUND_TIME",firstFoundTimedocLastFoundTime? lastFoundTime:docLastFoundTime); + protected void putMinAttribute(T firstDoc,T lastDoc,String attribute){ + long firstMinAttribute = Long.parseLong(firstDoc.getAttribute(attribute).toString()); + long lastMinAttribute = Long.parseLong(lastDoc.getAttribute(attribute).toString()); + lastDoc.addAttribute(attribute,firstMinAttributelastMaxAttribute? firstMaxAttribute:lastMaxAttribute); + } + + protected void putSumAttribute(T firstDoc,T lastDoc,String attribute){ + long firstSumAttribute = Long.parseLong(firstDoc.getAttribute(attribute).toString()); + long lastSumAttribute = Long.parseLong(lastDoc.getAttribute(attribute).toString()); + lastDoc.addAttribute(attribute,firstSumAttribute+lastSumAttribute); } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java index 29e6ec2..447f7fa 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java @@ -6,7 +6,6 @@ import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -46,26 +45,25 @@ public class Relationship extends Document { } @Override - protected void mergeFunction(Map newProperties, BaseEdgeDocument lastDoc) { - super.mergeFunction(newProperties, lastDoc); + protected void mergeFunction(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) { + super.mergeFunction(lastDoc, newDocument); } - protected void mergeProtocol(Map newProperties, BaseEdgeDocument lastDoc) { + protected void mergeProtocol(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) { String schema = lastDoc.getAttribute("PROTOCOL_TYPE").toString(); - if (ReadClickhouseData.protocolSet.contains(schema)){ - setProtocolProperties(schema,newProperties,lastDoc); + if (ReadClickhouseData.PROTOCOL_SET.contains(schema)){ + setProtocolProperties(schema,newDocument,lastDoc); } } - private void setProtocolProperties(String protocol,Map newProperties, BaseEdgeDocument lastDoc){ + private void setProtocolProperties(String protocol,BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument){ String protocolRecent = protocol +"_CNT_RECENT"; String protocolTotal = protocol + "_CNT_TOTAL"; - long httpCntTotal = Long.parseLong(lastDoc.getAttribute(protocolTotal).toString()); - newProperties.put(protocolTotal, httpCntTotal); - long[] httpCntRecents = (long[]) lastDoc.getAttribute(protocolRecent); - newProperties.put(protocolRecent, httpCntRecents); - String protocolType = newProperties.get("PROTOCOL_TYPE").toString(); - newProperties.put("PROTOCOL_TYPE",addProcotolType(protocolType,protocol)); + putSumAttribute(lastDoc,newDocument,protocolTotal); + long[] cntRecents = (long[]) lastDoc.getAttribute(protocolRecent); + newDocument.addAttribute(protocolRecent, cntRecents); + String protocolType = newDocument.getAttribute("PROTOCOL_TYPE").toString(); + newDocument.addAttribute("PROTOCOL_TYPE",addProcotolType(protocolType,protocol)); } private String addProcotolType(String protocolType,String schema){ diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java index eebbb74..83b7497 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java @@ -4,7 +4,6 @@ import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -28,8 +27,8 @@ public class Vertex extends Document { } @Override - protected void mergeFunction(Map properties, BaseDocument doc) { - super.mergeFunction(properties, doc); + protected void mergeFunction(BaseDocument lastDoc,BaseDocument newDocument) { + super.mergeFunction(lastDoc, newDocument); } @Override diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java index 373e8d0..383ab97 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java @@ -1,6 +1,5 @@ package cn.ac.iie.service.update.relationship; -import cn.ac.iie.service.read.ReadClickhouseData; import cn.ac.iie.service.update.Relationship; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; @@ -9,6 +8,8 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import static cn.ac.iie.service.read.ReadClickhouseData.*; + public class LocateFqdn2Ip extends Relationship { public LocateFqdn2Ip(HashMap> newDocumentHashMap, @@ -20,18 +21,31 @@ public class LocateFqdn2Ip extends Relationship { } @Override - protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc){ - super.mergeFunction(properties,schemaEdgeDoc); - mergeProtocol(properties, schemaEdgeDoc); + protected void mergeFunction(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument){ + super.mergeFunction(lastDoc, newDocument); + mergeDistinctClientIp(lastDoc, newDocument); + putSumAttribute(lastDoc, newDocument,"CNT_TOTAL"); + } + + private void mergeDistinctClientIp(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument){ + HashSet clientIpSet = new HashSet<>(); + String[] distCips = (String[]) newDocument.getAttribute("DIST_CIP"); + String[] lastDistCips = (String[]) lastDoc.getAttribute("DIST_CIP"); + clientIpSet.addAll(Arrays.asList(distCips)); + clientIpSet.addAll(Arrays.asList(lastDistCips)); + long[] clientIpTs = new long[clientIpSet.size()]; + for (int i = 0; i < clientIpTs.length; i++) { + clientIpTs[i] = currentHour; + } + newDocument.addAttribute("DIST_CIP", clientIpSet.toArray()); + newDocument.addAttribute("DIST_CIP_TS", clientIpTs); } @Override protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { super.updateFunction(newEdgeDocument, historyEdgeDocument); - updateProcotol(historyEdgeDocument,"TLS",newEdgeDocument); - updateProcotol(historyEdgeDocument,"HTTP",newEdgeDocument); - updateProcotol(historyEdgeDocument,"DNS",newEdgeDocument); updateDistinctClientIp(newEdgeDocument, historyEdgeDocument); + putSumAttribute(newEdgeDocument, historyEdgeDocument,"CNT_TOTAL"); } private void updateDistinctClientIp(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ @@ -45,7 +59,7 @@ public class LocateFqdn2Ip extends Relationship { } Object[] distCipRecent = (Object[])newEdgeDocument.getAttribute("DIST_CIP"); for (Object cip:distCipRecent){ - distCipToTs.put(cip.toString(), ReadClickhouseData.currentHour); + distCipToTs.put(cip.toString(), currentHour); } Map sortDistCip = sortMapByValue(distCipToTs); @@ -65,8 +79,8 @@ public class LocateFqdn2Ip extends Relationship { List> entryList = new ArrayList<>(oriMap.entrySet()); entryList.sort((o1, o2) -> o2.getValue().compareTo(o1.getValue())); - if(entryList.size() > 100){ - for(Map.Entry set:entryList.subList(0, 100)){ + if(entryList.size() > DISTINCT_CLIENT_IP_NUM){ + for(Map.Entry set:entryList.subList(0, DISTINCT_CLIENT_IP_NUM)){ sortedMap.put(set.getKey(), set.getValue()); } }else { diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/SameFqdn2Fqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/SameFqdn2Fqdn.java new file mode 100644 index 0000000..93ffd96 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/SameFqdn2Fqdn.java @@ -0,0 +1,34 @@ +package cn.ac.iie.service.update.relationship; + +import cn.ac.iie.service.update.Relationship; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class SameFqdn2Fqdn extends Relationship { + + public SameFqdn2Fqdn(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); + } + + @Override + protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { + super.updateFunction(newEdgeDocument, historyEdgeDocument); + putSumAttribute(newEdgeDocument,historyEdgeDocument,"CNT_TOTAL"); + } + + @Override + protected void mergeFunction(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) { + super.mergeFunction(lastDoc, newDocument); + putSumAttribute(lastDoc,newDocument,"CNT_TOTAL"); + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java index 6565d84..1465106 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java @@ -6,7 +6,6 @@ import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -22,14 +21,12 @@ public class VisitIp2Fqdn extends Relationship { @Override protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { super.updateFunction(newEdgeDocument, historyEdgeDocument); - updateProcotol(historyEdgeDocument,"TLS",newEdgeDocument); - updateProcotol(historyEdgeDocument,"HTTP",newEdgeDocument); - updateProcotol(historyEdgeDocument,"DNS",newEdgeDocument); + putSumAttribute(newEdgeDocument,historyEdgeDocument,"CNT_TOTAL"); } @Override - protected void mergeFunction(Map newProperties, BaseEdgeDocument lastDoc) { - super.mergeFunction(newProperties, lastDoc); - mergeProtocol(newProperties, lastDoc); + protected void mergeFunction(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) { + super.mergeFunction(lastDoc, newDocument); + putSumAttribute(lastDoc,newDocument,"CNT_TOTAL"); } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/vertex/Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/vertex/Ip.java index 4cdedbd..925816b 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/vertex/Ip.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/vertex/Ip.java @@ -6,7 +6,6 @@ import com.arangodb.entity.BaseDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -17,7 +16,7 @@ public class Ip extends Vertex { String collectionName, ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); } @Override @@ -27,53 +26,23 @@ public class Ip extends Vertex { } @Override - protected void mergeFunction(Map properties, BaseDocument doc) { - super.mergeFunction(properties, doc); - mergeIpByType(properties,doc); + protected void mergeFunction(BaseDocument lastDoc, BaseDocument newDocument) { + super.mergeFunction(lastDoc, newDocument); + mergeIpByType(lastDoc, newDocument); } - private void mergeIpByType(Map properties, BaseDocument doc){ - Map mergeProperties = doc.getProperties(); - checkIpTypeProperty(properties,mergeProperties,"CLIENT_SESSION_COUNT"); - checkIpTypeProperty(properties,mergeProperties,"CLIENT_BYTES_SUM"); - checkIpTypeProperty(properties,mergeProperties,"SERVER_SESSION_COUNT"); - checkIpTypeProperty(properties,mergeProperties,"SERVER_BYTES_SUM"); + private void mergeIpByType(BaseDocument lastDoc, BaseDocument newDocument) { + putSumAttribute(lastDoc,newDocument,"CLIENT_SESSION_COUNT"); + putSumAttribute(lastDoc,newDocument,"CLIENT_BYTES_SUM"); + putSumAttribute(lastDoc,newDocument,"SERVER_SESSION_COUNT"); + putSumAttribute(lastDoc,newDocument,"SERVER_BYTES_SUM"); } - private void checkIpTypeProperty(Map properties,Map mergeProperties,String property){ - try { - if (!properties.containsKey(property)){ - properties.put(property,0L); - checkIpTypeProperty(properties,mergeProperties,property); - }else if ("0".equals(properties.get(property).toString()) && mergeProperties.containsKey(property)){ - if (!"0".equals(mergeProperties.get(property).toString())){ - properties.put(property,Long.parseLong(mergeProperties.get(property).toString())); - } - } - }catch (Exception e){ - e.printStackTrace(); - } - } - - private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument){ - addProperty(newDocument,historyDocument,"CLIENT_SESSION_COUNT"); - addProperty(newDocument,historyDocument,"CLIENT_BYTES_SUM"); - addProperty(newDocument,historyDocument,"SERVER_SESSION_COUNT"); - addProperty(newDocument,historyDocument,"SERVER_BYTES_SUM"); - } - - private void addProperty(BaseDocument newDocument, BaseDocument historyDocument,String property){ - try { - if (historyDocument.getProperties().containsKey(property)){ - long newProperty = Long.parseLong(newDocument.getAttribute(property).toString()); - long hisProperty = Long.parseLong(historyDocument.getAttribute(property).toString()); - historyDocument.updateAttribute(property,newProperty+hisProperty); - }else { - historyDocument.addAttribute(property,0L); - } - }catch (Exception e){ - e.printStackTrace(); - } + private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument) { + putSumAttribute(newDocument, historyDocument, "CLIENT_SESSION_COUNT"); + putSumAttribute(newDocument, historyDocument, "CLIENT_BYTES_SUM"); + putSumAttribute(newDocument, historyDocument, "SERVER_SESSION_COUNT"); + putSumAttribute(newDocument, historyDocument, "SERVER_BYTES_SUM"); } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index e0de171..fc62f08 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -11,12 +11,15 @@ import com.arangodb.entity.MultiDocumentEntity; import com.arangodb.model.AqlQueryOptions; import com.arangodb.model.DocumentCreateOptions; import com.arangodb.util.MapBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; import java.util.Map; public class ArangoDBConnect { + private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class); private static ArangoDB arangoDB = null; private static ArangoDBConnect conn = null; static { @@ -98,7 +101,7 @@ public class ArangoDBConnect { MultiDocumentEntity> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions); Collection errors = documentCreateEntityMultiDocumentEntity.getErrors(); for (ErrorEntity errorEntity:errors){ - System.out.println("写入arangoDB异常:"+errorEntity.getErrorMessage()); + LOG.debug("写入arangoDB异常:"+errorEntity.getErrorMessage()); } } }catch (Exception e){ diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java index 29cc5a5..e3142ae 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java @@ -1,11 +1,14 @@ package cn.ac.iie.utils; import cn.ac.iie.config.ApplicationConfig; +import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +/** + * 线程池管理 + * @author wlh + */ public class ExecutorThreadPool { private static ExecutorService pool = null ; private static ExecutorThreadPool poolExecutor = null; @@ -15,7 +18,15 @@ public class ExecutorThreadPool { } private static void getThreadPool(){ - pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER); + ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("iplearning-application-pool-%d").build(); + + //Common Thread Pool + pool = new ThreadPoolExecutor(ApplicationConfig.THREAD_POOL_NUMBER, ApplicationConfig.THREAD_POOL_NUMBER*2, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); + +// pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER); } public static ExecutorThreadPool getInstance(){ @@ -29,6 +40,7 @@ public class ExecutorThreadPool { pool.execute(command); } + @Deprecated public void awaitThreadTask(){ try { while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) { diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java index b2823d1..b736162 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java @@ -120,37 +120,24 @@ public class TopDomainUtils { //通用方法,传入url,返回domain,这里的domain不包含端口号,含有:一定是v6 public static String getDomainFromUrl(String oriUrl) { //先按照?切分,排除后续干扰 - //后续操作不再涉及?号,排除http://在?后的情况 String url = oriUrl.split("[?]")[0]; - //获取file_path与domain - if (url.contains("http://") || url.contains("https://")) { - //包含http://或者https://时 - //获取domain - if (url.split("//")[1].split("/")[0].split(":").length <= 2) { - //按照:切分后最终长度为1或2,说明是v4 - String v4Domain = url.split("//")[1]//按照//切分,索引1包含domain - .split("/")[0]//按照/切分,索引0包含domain - .split(":")[0];//v4按照:切分去除domain上的端口号后,索引0为最终域名 - return v4Domain; - } else { - //按照:切分后长度>2,说明是v6地址,v6地址不包含端口号(暂定),只需要先切分//再切分/ - String v6Domain = url.split("//")[1]//按照//切分,索引1包含domain - .split("/")[0];//v6按照/切分后索引0就是domain - return v6Domain; - } + //排除http://或https://干扰 + url = url.replaceAll("https://","").replaceAll("http://",""); + String domain; + + //获取domain + if (url.split("/")[0].split(":").length <= 2) { + //按照:切分后最终长度为1或2,说明是v4 + domain = url + //按照/切分,索引0包含domain + .split("/")[0] + //v4按照:切分去除domain上的端口号后,索引0为最终域名 + .split(":")[0]; } else { - //无http://或者https:// - //获取domain - if (url.split("/")[0].split(":").length <= 2) { - //按照:切分后长度为1或2,说明为v4 - //无http://时直接按照/切分,索引0包含域名domain,再按照":"切分,0索引就是domain - String v4Domain = url.split("/")[0].split(":")[0]; - return v4Domain; - } else { - //按照:切分后长度>2,说明为v6,v6地址不包含端口号(暂定),只需要切分/取索引0 - String v6Domain = url.split("/")[0]; - return v6Domain; - } + //按照:切分后长度>2,说明是v6地址,v6地址不包含端口号(暂定),只需要先切分//再切分/ + domain = url.split("/")[0]; } + return domain; + } } diff --git a/ip-learning-java-test/src/main/resources/application.properties b/ip-learning-java-test/src/main/resources/application.properties index 313d233..92e602a 100644 --- a/ip-learning-java-test/src/main/resources/application.properties +++ b/ip-learning-java-test/src/main/resources/application.properties @@ -3,7 +3,8 @@ arangoDB.host=192.168.40.182 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111 -arangoDB.DB.name=ip-learning-test-0 +#arangoDB.DB.name=ip-learning-test +arangoDB.DB.name=insert_iplearn_index arangoDB.batch=100000 arangoDB.ttl=3600 @@ -12,5 +13,5 @@ update.arango.batch=10000 thread.pool.number=10 thread.await.termination.time=10 -read.clickhouse.max.time=1594809098 -read.clickhouse.min.time=1593792000 \ No newline at end of file +read.clickhouse.max.time=1571245220 +read.clickhouse.min.time=1571245210 \ No newline at end of file diff --git a/ip-learning-java-test/src/main/resources/clickhouse.properties b/ip-learning-java-test/src/main/resources/clickhouse.properties index 00ebd01..01689b5 100644 --- a/ip-learning-java-test/src/main/resources/clickhouse.properties +++ b/ip-learning-java-test/src/main/resources/clickhouse.properties @@ -1,6 +1,6 @@ drivers=ru.yandex.clickhouse.ClickHouseDriver -#db.id=192.168.40.193:8123/av_miner?socket_timeout=300000 -db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000 +db.id=192.168.40.193:8123/av_miner?socket_timeout=300000 +#db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000 mdb.user=default mdb.password=111111 initialsize=1