diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index 02bb2d8..4c32287 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -129,7 +129,7 @@ public class BaseClickhouseData { while (resultSet.next()){ BaseEdgeDocument newDoc = getRelationshipSubsciberLocateIpDocument(resultSet); String key = newDoc.getKey(); - putMapByHashcode(resultSet, newDoc, eSubsciberLocateIpMap,key); + putMapByHashcode(newDoc, eSubsciberLocateIpMap,key); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse ESubsciberLocateIp时间:" + (last - start)); @@ -154,7 +154,7 @@ public class BaseClickhouseData { while (resultSet.next()) { BaseEdgeDocument newDoc = getRelationFqdnAddressIpDocument(resultSet); String commonSchemaType = resultSet.getString("common_schema_type"); - putMapByHashcode(resultSet, newDoc, eFqdnAddressIpMap,commonSchemaType); + putMapByHashcode(newDoc, eFqdnAddressIpMap,commonSchemaType); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start)); @@ -177,7 +177,7 @@ public class BaseClickhouseData { while (resultSet.next()) { BaseEdgeDocument newDoc = getRelationIpVisitFqdnDocument(resultSet); String commonSchemaType = resultSet.getString("common_schema_type"); - putMapByHashcode(resultSet, newDoc, eIpVisitFqdnMap,commonSchemaType); + putMapByHashcode(newDoc, eIpVisitFqdnMap,commonSchemaType); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java index f9f96e7..2b48e7d 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java @@ -32,27 +32,31 @@ public class ReadClickhouseData { } public static BaseDocument getVertexIpDocument(ResultSet resultSet) throws SQLException { + BaseDocument newDoc = new BaseDocument(); String ip = resultSet.getString("IP"); - String location = resultSet.getString("location"); - String[] locationSplit = location.split(";"); - String ipLocationNation; - String ipLocationRegion; - if (locationSplit.length == 3) { - ipLocationNation = locationSplit[0]; - ipLocationRegion = locationSplit[1]; - } else { - ipLocationNation = location; - ipLocationRegion = location; - } long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - BaseDocument newDoc = new BaseDocument(); + long sessionCount = resultSet.getLong("SESSION_COUNT"); + long bytesSum = resultSet.getLong("BYTES_SUM"); + String ipType = resultSet.getString("ip_type"); newDoc.setKey(ip); newDoc.addAttribute("IP", ip); - newDoc.addAttribute("IP_LOCATION_NATION", ipLocationNation); - newDoc.addAttribute("IP_LOCATION_REGION", ipLocationRegion); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + switch (ipType) { + case "client": + newDoc.addAttribute("CLIENT_SESSION_COUNT", sessionCount); + newDoc.addAttribute("CLIENT_BYTES_SUM", bytesSum); + newDoc.addAttribute("SERVER_SESSION_COUNT",0L); + newDoc.addAttribute("SERVER_BYTES_SUM",0L); + break; + case "server": + newDoc.addAttribute("SERVER_SESSION_COUNT", sessionCount); + newDoc.addAttribute("SERVER_BYTES_SUM", bytesSum); + newDoc.addAttribute("CLIENT_SESSION_COUNT",0L); + newDoc.addAttribute("CLIENT_BYTES_SUM",0L); + break; + } return newDoc; } @@ -133,8 +137,8 @@ public class ReadClickhouseData { return newDoc; } - public static void putMapByHashcode(ResultSet resultSet, BaseEdgeDocument newDoc, HashMap>> map,String schema) throws SQLException { - if (newDoc != null){ + public static void putMapByHashcode(BaseEdgeDocument newDoc, HashMap>> map, String schema) throws SQLException { + if (newDoc != null) { String key = newDoc.getKey(); int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; HashMap> documentHashMap = map.getOrDefault(i, new HashMap()); @@ -145,7 +149,7 @@ public class ReadClickhouseData { } } - public static boolean isDomain(String fqdn) { + private static boolean isDomain(String fqdn) { try { String[] fqdnArr = fqdn.split("\\."); if (fqdnArr.length < 4 || fqdnArr.length > 4) { @@ -182,10 +186,10 @@ public class ReadClickhouseData { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; - String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime + " AND (common_schema_type = 'HTTP' or common_schema_type = 'SSL')"; - String clientIpSql = "SELECT common_client_ip AS IP, common_client_location AS location, common_recv_time FROM tsg_galaxy_v3.connection_record_log where " + where; - String serverIpSql = "SELECT common_server_ip AS IP, common_server_location AS location, common_recv_time FROM tsg_galaxy_v3.connection_record_log where " + where; - return "SELECT IP,location,MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,COUNT(*) AS IP_COUNT_TOTAL FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + ")) GROUP BY IP,location"; + String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime; + String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; + String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; + return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))"; } public static String getEFqdnAddressIpSql() { @@ -208,20 +212,20 @@ public class ReadClickhouseData { return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; } - public static String getVertexSubscriberSql(){ + public static String getVertexSubscriberSql() { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; - String where = " common_recv_time >= "+minTime+" AND common_recv_time <= "+maxTime+" AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; - return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE"+where+" GROUP BY common_subscriber_id"; + String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; + return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id"; } - public static String getRelationshipSubsciberLocateIpSql(){ + public static String getRelationshipSubsciberLocateIpSql() { long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; - String where = " common_recv_time >= "+minTime+" AND common_recv_time <= "+maxTime+" AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; - return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE"+where+" GROUP BY common_subscriber_id,radius_framed_ip"; + String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; + return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id,radius_framed_ip"; } private static long[] getTimeLimit() { diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java index 6910325..02f3251 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java @@ -3,12 +3,15 @@ package cn.ac.iie.service.update; import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; public class Relationship extends Thread { + private static final Logger LOG = LoggerFactory.getLogger(Relationship.class); protected HashMap> newDocumentHashMap; protected ArangoDBConnect arangoManger; @@ -43,18 +46,18 @@ public class Relationship extends Thread { updateRelationship(newEdgeDocument,historyEdgeDocument,docInsert); if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { arangoManger.overwrite(docInsert, collectionName); - System.out.println("更新"+collectionName+":" + i); + LOG.info("更新"+collectionName+":" + i); i = 0; } } } if (i != 0) { arangoManger.overwrite(docInsert, collectionName); - System.out.println("更新"+collectionName+":" + i); + LOG.info("更新"+collectionName+":" + i); } } catch (Exception e) { e.printStackTrace(); - System.out.println(e.toString()); + LOG.error(e.toString()); }finally { countDownLatch.countDown(); } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java index f34a510..f4c31bf 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java @@ -4,6 +4,8 @@ import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -17,6 +19,7 @@ import java.util.concurrent.CountDownLatch; * 多线程更新vertex数据 */ public class Vertex extends Thread{ + private static final Logger LOG = LoggerFactory.getLogger(Vertex.class); protected HashMap> newDocumentHashMap; protected ArangoDBConnect arangoManger; @@ -52,16 +55,17 @@ public class Vertex extends Thread{ } if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ arangoManger.overwrite(docInsert,collectionName); - System.out.println("更新"+i); + LOG.info("更新"+collectionName+":"+i); i = 0; } } if (i != 0){ arangoManger.overwrite(docInsert,collectionName); - System.out.println("更新"+i); + LOG.info("更新"+collectionName+":"+i); } }catch (Exception e){ e.printStackTrace(); + LOG.error(e.toString()); }finally { countDownLatch.countDown(); } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java index b5c5610..daa53f7 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java @@ -24,12 +24,57 @@ public class Ip extends Vertex { @Override protected void mergeFunction(Map properties, BaseDocument doc) { super.mergeFunction(properties, doc); + mergeIpByType(properties,doc); + } + private void mergeIpByType(Map properties, BaseDocument doc){ + Map mergeProperties = doc.getProperties(); + checkIpTypeProperty(properties,mergeProperties,"CLIENT_SESSION_COUNT"); + checkIpTypeProperty(properties,mergeProperties,"CLIENT_BYTES_SUM"); + checkIpTypeProperty(properties,mergeProperties,"SERVER_SESSION_COUNT"); + checkIpTypeProperty(properties,mergeProperties,"SERVER_BYTES_SUM"); + } + + private void checkIpTypeProperty(Map properties,Map mergeProperties,String property){ + try { + if (!properties.containsKey(property)){ + properties.put(property,0L); + checkIpTypeProperty(properties,mergeProperties,property); + }else if (properties.get(property).toString().equals("0") && mergeProperties.containsKey(property)){ + if (!mergeProperties.get(property).toString().equals("0")){ + properties.put(property,Long.parseLong(mergeProperties.get(property).toString())); + } + } + }catch (Exception e){ + e.printStackTrace(); + } } @Override protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) { super.updateFunction(newDocument, historyDocument); - + updateIpByType(newDocument, historyDocument); } + + private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument){ + addProperty(newDocument,historyDocument,"CLIENT_SESSION_COUNT"); + addProperty(newDocument,historyDocument,"CLIENT_BYTES_SUM"); + addProperty(newDocument,historyDocument,"SERVER_SESSION_COUNT"); + addProperty(newDocument,historyDocument,"SERVER_BYTES_SUM"); + } + + private void addProperty(BaseDocument newDocument, BaseDocument historyDocument,String property){ + try { + if (historyDocument.getProperties().containsKey(property)){ + long newProperty = Long.parseLong(newDocument.getAttribute(property).toString()); + long hisProperty = Long.parseLong(historyDocument.getAttribute(property).toString()); + historyDocument.updateAttribute(property,newProperty+hisProperty); + }else { + historyDocument.addAttribute(property,0L); + } + }catch (Exception e){ + e.printStackTrace(); + } + } + } diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index 3a3942b..9428d55 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -13,5 +13,5 @@ update.arango.batch=10000 thread.pool.number=10 thread.await.termination.time=10 -read.clickhouse.max.time=1594194404 +read.clickhouse.max.time=1594376834 read.clickhouse.min.time=1593676953 \ No newline at end of file