diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java b/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java index 7e0492b..68f8398 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java @@ -14,6 +14,7 @@ public class ApplicationConfig { public static final Integer ARANGODB_BATCH = ConfigUtils.getIntProperty( "arangoDB.batch"); public static final Integer UPDATE_ARANGO_BATCH = ConfigUtils.getIntProperty("update.arango.batch"); + public static final String ARANGODB_READ_LIMIT = ConfigUtils.getStringProperty("arangoDB.read.limit"); public static final Integer THREAD_POOL_NUMBER = ConfigUtils.getIntProperty( "thread.pool.number"); public static final Integer THREAD_AWAIT_TERMINATION_TIME = ConfigUtils.getIntProperty( "thread.await.termination.time"); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index fa5e88b..759e5b2 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -87,7 +87,7 @@ public class BaseArangoData { long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; long maxThreadTime = minTime + (threadNumber + 1)* diffTime; long minThreadTime = minTime + threadNumber * diffTime; - return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; + return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" " + ApplicationConfig.ARANGODB_READ_LIMIT + " RETURN doc"; } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java index 05c28ea..12772bd 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java @@ -65,6 +65,11 @@ public class ReadClickhouseData { long sessionCount = resultSet.getLong("SESSION_COUNT"); long bytesSum = resultSet.getLong("BYTES_SUM"); String ipType = resultSet.getString("ip_type"); + String[] commonLinkInfos = (String[]) resultSet.getArray("common_link_info").getArray(); + String commonLinkInfo = ""; + if (commonLinkInfos.length > 1){ + commonLinkInfo = commonLinkInfos[1]; + } newDoc.setKey(ip); newDoc.addAttribute("IP", ip); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); @@ -84,7 +89,7 @@ public class ReadClickhouseData { break; default: } - newDoc.addAttribute("COMMON_LINK_INFO", ""); + newDoc.addAttribute("COMMON_LINK_INFO", commonLinkInfo); } catch (Exception e) { e.printStackTrace(); } @@ -258,8 +263,8 @@ public class ReadClickhouseData { public static String getVertexIpSql() { String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; - String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; + String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; + String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))"; } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java index 8d69b46..436b83d 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java @@ -124,5 +124,10 @@ public class Document extends Thread{ lastDoc.addAttribute(attribute,firstSumAttribute+lastSumAttribute); } + protected void replaceAttribute(T firstDoc,T lastDoc,String attribute){ + Object attributeObj = firstDoc.getAttribute(attribute); + lastDoc.addAttribute(attribute,attributeObj); + } + } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java index 61bb0b9..9a44fac 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java @@ -23,19 +23,13 @@ public class Ip extends Vertex { protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) { super.updateFunction(newDocument, historyDocument); updateIpByType(newDocument, historyDocument); + super.replaceAttribute(newDocument,historyDocument,"COMMON_LINK_INFO"); } @Override protected void mergeFunction(BaseDocument lastDoc, BaseDocument newDocument) { super.mergeFunction(lastDoc, newDocument); - mergeIpByType(lastDoc, newDocument); - } - - private void mergeIpByType(BaseDocument lastDoc, BaseDocument newDocument) { - putSumAttribute(lastDoc,newDocument,"CLIENT_SESSION_COUNT"); - putSumAttribute(lastDoc,newDocument,"CLIENT_BYTES_SUM"); - putSumAttribute(lastDoc,newDocument,"SERVER_SESSION_COUNT"); - putSumAttribute(lastDoc,newDocument,"SERVER_BYTES_SUM"); + updateIpByType(lastDoc, newDocument); } private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument) { diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index c09d5d5..ed8bb1f 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -2,13 +2,14 @@ #arangoDB.host=192.168.40.182 arangoDB.host=192.168.40.224 arangoDB.port=8529 -arangoDB.user=root -arangoDB.password=111111 +arangoDB.user=upsert +arangoDB.password=ceiec2018 arangoDB.DB.name=ip-learning-test -#arangoDB.DB.name=ip-learning-test-0 +#arangoDB.DB.name=tsg_galaxy_v3 arangoDB.batch=100000 arangoDB.ttl=3600 +arangoDB.read.limit=limit 100 update.arango.batch=10000 thread.pool.number=10 @@ -16,9 +17,9 @@ thread.await.termination.time=10 #读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围 -time.limit.type=1 -read.clickhouse.max.time=1596097930 -read.clickhouse.min.time=1593676894 +time.limit.type=0 +read.clickhouse.max.time=1595833062 +read.clickhouse.min.time=1595833060 update.interval=3600 distinct.client.ip.num=10000 diff --git a/IP-learning-graph/src/main/resources/clickhouse.properties b/IP-learning-graph/src/main/resources/clickhouse.properties index 3b0ddbb..f3607e9 100644 --- a/IP-learning-graph/src/main/resources/clickhouse.properties +++ b/IP-learning-graph/src/main/resources/clickhouse.properties @@ -1,9 +1,9 @@ drivers=ru.yandex.clickhouse.ClickHouseDriver -db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000 -#db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000 mdb.user=default -mdb.password=ceiec2019 +#db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000 #mdb.password=111111 +db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000 +mdb.password=ceiec2019 initialsize=1 minidle=1 maxactive=50