diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index 6663b35..89518c7 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -66,22 +66,20 @@ public class BaseClickhouseData { DruidPooledConnection connection = manger.getConnection(); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); -// HashSet fqdnSet = new HashSet<>(); while (resultSet.next()) { -// String commonSchemaType = resultSet.getString("common_schema_type"); -// String fqdnName = commonSchemaGetFqdn(commonSchemaType,resultSet); String fqdnName = resultSet.getString("FQDN"); -// fqdnSet.add(fqdnName); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - BaseDocument newDoc = new BaseDocument(); - newDoc.setKey(fqdnName); - newDoc.addAttribute("FQDN_NAME", fqdnName); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - int i = Math.abs(fqdnName.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - ArrayList documentList = vFqdnMap.getOrDefault(i, new ArrayList<>()); - documentList.add(newDoc); + if (isDomain(fqdnName)){ + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + BaseDocument newDoc = new BaseDocument(); + newDoc.setKey(fqdnName); + newDoc.addAttribute("FQDN_NAME", fqdnName); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + int i = Math.abs(fqdnName.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; + ArrayList documentList = vFqdnMap.getOrDefault(i, new ArrayList<>()); + documentList.add(newDoc); + } } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse v_FQDN时间:" + (last - start)); @@ -154,29 +152,31 @@ public class BaseClickhouseData { while (resultSet.next()) { String commonSchemaType = resultSet.getString("common_schema_type"); String vFqdn = resultSet.getString("FQDN"); - String vIp = resultSet.getString("common_server_ip"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); + if (isDomain(vFqdn)){ + String vIp = resultSet.getString("common_server_ip"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); - String key = vFqdn + "-" + vIp; - BaseEdgeDocument newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("FQDN/" + vFqdn); - newDoc.setTo("IP/" + vIp); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL", countTotal); - newDoc.addAttribute("DIST_CIP_RECENT", distCipRecents); - newDoc.addAttribute("DIST_CIP_TOTAL", distCipRecents); + String key = vFqdn + "-" + vIp; + BaseEdgeDocument newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("FQDN/" + vFqdn); + newDoc.setTo("IP/" + vIp); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL", countTotal); + newDoc.addAttribute("DIST_CIP_RECENT", distCipRecents); + newDoc.addAttribute("DIST_CIP_TOTAL", distCipRecents); - int hashMod = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap> documentHashMap = eFqdnAddressIpMap.getOrDefault(hashMod, new HashMap()); + int hashMod = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; + HashMap> documentHashMap = eFqdnAddressIpMap.getOrDefault(hashMod, new HashMap()); - HashMap schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>()); - schemaHashMap.put(commonSchemaType, newDoc); - documentHashMap.put(key, schemaHashMap); + HashMap schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>()); + schemaHashMap.put(commonSchemaType, newDoc); + documentHashMap.put(key, schemaHashMap); + } } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start)); @@ -203,24 +203,26 @@ public class BaseClickhouseData { String commonSchemaType = resultSet.getString("common_schema_type"); String vIp = resultSet.getString("common_client_ip"); String vFqdn = resultSet.getString("FQDN"); - String key = vIp + "-" + vFqdn; - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); + if (isDomain(vFqdn)){ + String key = vIp + "-" + vFqdn; + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); - BaseEdgeDocument newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("IP/" + vIp); - newDoc.setTo("FQDN/" + vFqdn); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL", countTotal); - int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap> documentHashMap = eIpVisitFqdnMap.getOrDefault(i, new HashMap()); + BaseEdgeDocument newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("IP/" + vIp); + newDoc.setTo("FQDN/" + vFqdn); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL", countTotal); + int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; + HashMap> documentHashMap = eIpVisitFqdnMap.getOrDefault(i, new HashMap()); - HashMap schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>()); - schemaHashMap.put(commonSchemaType, newDoc); - documentHashMap.put(key, schemaHashMap); + HashMap schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>()); + schemaHashMap.put(commonSchemaType, newDoc); + documentHashMap.put(key, schemaHashMap); + } } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); @@ -235,6 +237,7 @@ public class BaseClickhouseData { } } + @Deprecated private static String commonSchemaGetFqdn(String commonSchemaType, ResultSet resultSet) { String vFqdn = ""; try { diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java index 75f7383..56ae5a2 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java @@ -10,37 +10,45 @@ public class BaseUpdateEtl { BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument(); Set schemaSets = newEdgeDocumentSchemaMap.keySet(); + Map properties = newBaseEdgeDocument.getProperties(); + for (String schema : schemaSets) { BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentSchemaMap.get(schema); - setSchemaCnt(schema,schemaEdgeDoc,newBaseEdgeDocument); - if (newBaseEdgeDocument.getKey() != null){ - Map properties = newBaseEdgeDocument.getProperties(); + if (!properties.isEmpty()){ setFoundTime(properties,schemaEdgeDoc); setDistinctClientIpBySchema(properties,schemaEdgeDoc); }else { - Map properties = schemaEdgeDoc.getProperties(); - properties.remove("COUNT_TOTAL"); newBaseEdgeDocument = schemaEdgeDoc; + properties = schemaEdgeDoc.getProperties(); } + setSchemaCnt(schema,schemaEdgeDoc,properties); } + properties.remove("COUNT_TOTAL"); + addSchemaProperty(properties); + + newBaseEdgeDocument.setProperties(properties); return newBaseEdgeDocument; } public static BaseEdgeDocument mergeIp2FqdnBySchema(HashMap newEdgeDocumentMap){ BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument(); Set schemaSets = newEdgeDocumentMap.keySet(); + Map properties = newBaseEdgeDocument.getProperties(); + for (String schema : schemaSets) { BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentMap.get(schema); - setSchemaCnt(schema,schemaEdgeDoc,newBaseEdgeDocument); - if (newBaseEdgeDocument.getKey() != null){ - Map properties = newBaseEdgeDocument.getProperties(); + if (!properties.isEmpty()){ setFoundTime(properties,schemaEdgeDoc); }else { - Map properties = schemaEdgeDoc.getProperties(); - properties.remove("COUNT_TOTAL"); newBaseEdgeDocument = schemaEdgeDoc; + properties = schemaEdgeDoc.getProperties(); } + setSchemaCnt(schema,schemaEdgeDoc,properties); } + properties.remove("COUNT_TOTAL"); + addSchemaProperty(properties); + + newBaseEdgeDocument.setProperties(properties); return newBaseEdgeDocument; } @@ -53,11 +61,21 @@ public class BaseUpdateEtl { setDistinctClientIpByHistory(newEdgeDocument,edgeDocument); } + private static void addSchemaProperty(Map properties){ + if (!properties.containsKey("TLS_CNT_TOTAL")){ + properties.put("TLS_CNT_TOTAL",0L); + properties.put("TLS_CNT_RECENT",new long[7]); + }else if (!properties.containsKey("HTTP_CNT_TOTAL")){ + properties.put("HTTP_CNT_TOTAL",0L); + properties.put("HTTP_CNT_RECENT",new long[7]); + } + } + private static void setDistinctClientIpByHistory(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ ArrayList distCipTotal = (ArrayList) edgeDocument.getAttribute("DIST_CIP_TOTAL"); String[] distCipTotalsSrc = distCipTotal.toArray(new String[distCipTotal.size()]); - String[] distCipRecentsSrc = (String[]) newEdgeDocument.getAttribute("DIST_CIP_RECENT"); + Object[] distCipRecentsSrc = (Object[])newEdgeDocument.getAttribute("DIST_CIP_RECENT"); if (distCipTotalsSrc.length == 30) { Object[] distCipTotals = mergeClientIp(distCipTotalsSrc, distCipRecentsSrc); edgeDocument.addAttribute("DIST_CIP_TOTAL", distCipTotals); @@ -88,8 +106,8 @@ public class BaseUpdateEtl { edgeDocument.addAttribute(totalSchema, countTotal + updateCountTotal); } - private static Object[] mergeClientIp(String[] distCipTotalsSrc,String[] distCipRecentsSrc){ - HashSet dIpSet = new HashSet<>(); + private static Object[] mergeClientIp(Object[] distCipTotalsSrc,Object[] distCipRecentsSrc){ + HashSet dIpSet = new HashSet<>(); dIpSet.addAll(Arrays.asList(distCipRecentsSrc)); dIpSet.addAll(Arrays.asList(distCipTotalsSrc)); Object[] distCipTotals = dIpSet.toArray(); @@ -103,7 +121,7 @@ public class BaseUpdateEtl { String[] schemaDistCipRecents = (String[]) schemaEdgeDoc.getAttribute("DIST_CIP_RECENT"); String[] distCipRecents = (String[]) properties.get("DIST_CIP_RECENT"); Object[] mergeClientIp = mergeClientIp(schemaDistCipRecents, distCipRecents); - properties.put("DIST_CIP_RECENT",mergeClientIp); + properties.put("DIST_CIP_RECENT", mergeClientIp); properties.put("DIST_CIP_TOTAL",mergeClientIp); } @@ -116,21 +134,21 @@ public class BaseUpdateEtl { properties.put("LAST_FOUND_TIME",schemaLastFoundTime>lastFoundTime?schemaLastFoundTime:lastFoundTime); } - private static void setSchemaCnt(String schema,BaseEdgeDocument schemaEdgeDoc,BaseEdgeDocument newBaseEdgeDocument){ + private static void setSchemaCnt(String schema,BaseEdgeDocument schemaEdgeDoc,Map properties){ switch (schema) { case "HTTP": long httpCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); - newBaseEdgeDocument.addAttribute("HTTP_CNT_TOTAL", httpCntTotal); + properties.put("HTTP_CNT_TOTAL", httpCntTotal); long[] httpCntRecentsDst = new long[7]; httpCntRecentsDst[0] = httpCntTotal; - newBaseEdgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); + properties.put("HTTP_CNT_RECENT", httpCntRecentsDst); break; case "SSL": long tlsCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); - newBaseEdgeDocument.addAttribute("TLS_CNT_TOTAL", tlsCntTotal); + properties.put("TLS_CNT_TOTAL", tlsCntTotal); long[] tlsCntRecentsDst = new long[7]; tlsCntRecentsDst[0] = tlsCntTotal; - newBaseEdgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); + properties.put("TLS_CNT_RECENT", tlsCntRecentsDst); break; } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java index 58502a8..0efdb72 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java @@ -50,7 +50,8 @@ public class UpdateEFqdnAddressIp implements Runnable { LOG.info("更新R_LOCATE_FQDN2IP:" + i); } } catch (Exception e) { - LOG.error(e.getMessage()); + e.printStackTrace(); + LOG.error(e.toString()); } } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java index c741667..fa4cc4f 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java @@ -54,7 +54,7 @@ public class UpdateEIpVisitFqdn implements Runnable { LOG.info("更新R_VISIT_IP2FQDN:" + i); } } catch (Exception e) { - LOG.error(e.getMessage()); + LOG.error(e.toString()); } } } diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index 4dd415a..753ac9e 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -1,5 +1,5 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.127 +arangoDB.host=192.168.40.182 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111 @@ -13,5 +13,5 @@ update.arango.batch=10000 thread.pool.number=10 thread.await.termination.time=10 -read.clickhouse.max.time=1593162456 +read.clickhouse.max.time=1593582211 read.clickhouse.min.time=1592879247 \ No newline at end of file