diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java b/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java index 68f8398..8b9b405 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java @@ -14,7 +14,7 @@ public class ApplicationConfig { public static final Integer ARANGODB_BATCH = ConfigUtils.getIntProperty( "arangoDB.batch"); public static final Integer UPDATE_ARANGO_BATCH = ConfigUtils.getIntProperty("update.arango.batch"); - public static final String ARANGODB_READ_LIMIT = ConfigUtils.getStringProperty("arangoDB.read.limit"); + public static final Long ARANGODB_READ_LIMIT = ConfigUtils.getLongProperty("arangoDB.read.limit"); public static final Integer THREAD_POOL_NUMBER = ConfigUtils.getIntProperty( "thread.pool.number"); public static final Integer THREAD_AWAIT_TERMINATION_TIME = ConfigUtils.getIntProperty( "thread.await.termination.time"); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index 4375d3a..ca39e22 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -129,6 +129,9 @@ public class BaseArangoData { private String getQuerySql(Long cnt,int threadNumber, String table){ long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER + 1; long offsetNum = threadNumber * sepNum; + if (sepNum > ApplicationConfig.ARANGODB_READ_LIMIT){ + sepNum = ApplicationConfig.ARANGODB_READ_LIMIT; + } return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc"; } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index 1572db1..6afeb80 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -44,8 +44,8 @@ public class UpdateGraphData { long start = System.currentTimeMillis(); try { - updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class, - ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); +// updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class, +// ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class, ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument); @@ -79,8 +79,8 @@ public class UpdateGraphData { long start = System.currentTimeMillis(); try { - updateDocument("FQDN", Fqdn.class,BaseDocument.class, - ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); +// updateDocument("FQDN", Fqdn.class,BaseDocument.class, +// ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); updateDocument("IP", Ip.class,BaseDocument.class, ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java index 86981fd..d0a573a 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java @@ -257,40 +257,40 @@ public class ReadClickhouseData { public static String getVertexFqdnSql() { String where = "common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni"; - String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host"; + String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni"; + String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host"; return "SELECT FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME FROM ((" + sslSql + ") UNION ALL (" + httpSql + ")) GROUP BY FQDN HAVING FQDN != ''"; } public static String getVertexIpSql() { String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; - String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; + String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info_c2s) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.session_record where " + where + " group by IP"; + String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info_s2c) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.session_record where " + where + " group by IP"; return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))"; } public static String getRelationshipFqdnAddressIpSql() { String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; - String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; + String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; + String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; } public static String getRelationshipIpVisitFqdnSql() { String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip"; - String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip"; + String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS schema_type FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip"; + String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'TLS' AS schema_type FROM tsg_galaxy_v3.session_record WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip"; return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; } public static String getVertexSubscriberSql() { String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; - return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id"; + return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record WHERE" + where + " GROUP BY common_subscriber_id"; } public static String getRelationshipSubsciberLocateIpSql() { String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; - return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id,radius_framed_ip"; + return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record WHERE" + where + " GROUP BY common_subscriber_id,radius_framed_ip"; } private static long[] getTimeLimit() { diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java index 9a44fac..4c42195 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java @@ -33,10 +33,10 @@ public class Ip extends Vertex { } private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument) { - putSumAttribute(newDocument, historyDocument, "CLIENT_SESSION_COUNT"); - putSumAttribute(newDocument, historyDocument, "CLIENT_BYTES_SUM"); - putSumAttribute(newDocument, historyDocument, "SERVER_SESSION_COUNT"); - putSumAttribute(newDocument, historyDocument, "SERVER_BYTES_SUM"); +// putSumAttribute(newDocument, historyDocument, "CLIENT_SESSION_COUNT"); +// putSumAttribute(newDocument, historyDocument, "CLIENT_BYTES_SUM"); +// putSumAttribute(newDocument, historyDocument, "SERVER_SESSION_COUNT"); +// putSumAttribute(newDocument, historyDocument, "SERVER_BYTES_SUM"); } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java index a13e44c..6326c0b 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java @@ -6,7 +6,6 @@ public class IpLearningApplicationTest { public static void main(String[] args) { UpdateGraphData updateGraphData = new UpdateGraphData(); -// updateGraphData.updateArango(); updateGraphData.updateArango2(); } diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index 7293dc1..5e15915 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -1,15 +1,13 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.182 -#arangoDB.host=192.168.40.224 +arangoDB.host=192.168.44.12 arangoDB.port=8529 -arangoDB.user=upsert -arangoDB.password=ceiec2018 -arangoDB.DB.name=ip-learning-test -#arangoDB.DB.name=tsg_galaxy_v3 +arangoDB.user=root +arangoDB.password=ceiec2019 +arangoDB.DB.name=tsg_galaxy_v3 arangoDB.batch=100000 arangoDB.ttl=3600 -arangoDB.read.limit= +arangoDB.read.limit=10000000 update.arango.batch=10000 thread.pool.number=10 diff --git a/IP-learning-graph/src/main/resources/clickhouse.properties b/IP-learning-graph/src/main/resources/clickhouse.properties index 3b18aa4..dadbd20 100644 --- a/IP-learning-graph/src/main/resources/clickhouse.properties +++ b/IP-learning-graph/src/main/resources/clickhouse.properties @@ -1,9 +1,7 @@ drivers=ru.yandex.clickhouse.ClickHouseDriver mdb.user=default -db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000 -mdb.password=111111 -#db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000 -#mdb.password=ceiec2019 +db.id=192.168.44.67:8123/tsg_galaxy_v3?socket_timeout=3600000 +mdb.password=ceiec2019 initialsize=1 minidle=1 maxactive=50 diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java deleted file mode 100644 index cffc50f..0000000 --- a/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java +++ /dev/null @@ -1,53 +0,0 @@ -package cn.ac.iie; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import cn.ac.iie.utils.ExecutorThreadPool; -import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; -import org.junit.After; -import org.junit.Test; - -import java.util.Enumeration; -import java.util.concurrent.ConcurrentHashMap; - -public class TestReadArango { - private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance(); - private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - private static BaseArangoData baseArangoData = new BaseArangoData(); - - - @Test - public void testReadFqdnFromArango() { - ConcurrentHashMap> historyData = - baseArangoData.readHistoryData("FQDN", BaseDocument.class); - printMap(historyData); - } - - @Test - public void testReadFqdnLocIpFromArango() { - ConcurrentHashMap> ip = - baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", BaseEdgeDocument.class); - printMap(ip); - } - - private void printMap(ConcurrentHashMap> historyData) { - ConcurrentHashMap map = historyData.get(2); - Enumeration keys = map.keys(); - while (keys.hasMoreElements()) { - String key = keys.nextElement(); - T document = map.get(key); - System.out.println(document.toString()); - } - } - - - @After - public void clearSource() { - pool.shutdown(); - arangoManger.clean(); - } - - -}