diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index 55d917c..fa5e88b 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -31,7 +31,9 @@ public class BaseArangoData { private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); - void readHistoryData(String table,ConcurrentHashMap> map,Class type){ + void readHistoryData(String table, + ConcurrentHashMap> map, + Class type) { try { LOG.info("开始更新"+table); long start = System.currentTimeMillis(); @@ -42,7 +44,8 @@ public class BaseArangoData { long[] timeRange = getTimeRange(table); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { String sql = getQuerySql(timeRange, i, table); - ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch); + ReadHistoryArangoData readHistoryArangoData = + new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch); threadPool.executor(readHistoryArangoData); } countDownLatch.await(); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index 2bfc5e3..7c89a18 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -35,7 +35,9 @@ public class BaseClickhouseData { private DruidPooledConnection connection; private Statement statement; - void baseDocumentFromClickhouse(HashMap>> newMap, Supplier getSqlSupplier, Function formatResultFunc){ + void baseDocumentFromClickhouse(HashMap>> newMap, + Supplier getSqlSupplier, + Function formatResultFunc) { long start = System.currentTimeMillis(); initializeMap(newMap); String sql = getSqlSupplier.get(); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index dbbd81f..8012f85 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -60,9 +60,9 @@ public class UpdateGraphData { LocateFqdn2Ip.class,BaseEdgeDocument.class, ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument); - updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN", - VisitIp2Fqdn.class,BaseEdgeDocument.class, - ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument); +// updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN", +// VisitIp2Fqdn.class,BaseEdgeDocument.class, +// ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument); updateDocument(newRelationSubsciberLocateIpMap,historyRelationSubsciberLocateIpMap,"R_LOCATE_SUBSCRIBER2IP", LocateSubscriber2Ip.class,BaseEdgeDocument.class, @@ -106,7 +106,7 @@ public class UpdateGraphData { String.class, ConcurrentHashMap.class, CountDownLatch.class); - Document docTask = (Document)constructor.newInstance(tmpNewMap, arangoManger, collection, tmpHisMap, countDownLatch); + Document docTask = (Document)constructor.newInstance(tmpNewMap, arangoManger, collection, tmpHisMap, countDownLatch); pool.executor(docTask); } countDownLatch.await(); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java index 5700cb8..05c28ea 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java @@ -23,9 +23,11 @@ public class ReadClickhouseData { private static final Logger LOG = LoggerFactory.getLogger(ReadClickhouseData.class); private static long[] timeLimit = getTimeLimit(); + private static long maxTime = timeLimit[0]; + private static long minTime = timeLimit[1]; public static final Integer DISTINCT_CLIENT_IP_NUM = ApplicationConfig.DISTINCT_CLIENT_IP_NUM; - public static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR; + static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR; public static final HashSet PROTOCOL_SET; static { @@ -248,8 +250,6 @@ public class ReadClickhouseData { } public static String getVertexFqdnSql() { - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; String where = "common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni"; String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host"; @@ -257,8 +257,6 @@ public class ReadClickhouseData { } public static String getVertexIpSql() { - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; @@ -266,8 +264,6 @@ public class ReadClickhouseData { } public static String getRelationshipFqdnAddressIpSql() { - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; @@ -275,8 +271,6 @@ public class ReadClickhouseData { } public static String getRelationshipIpVisitFqdnSql() { - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip"; String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip"; @@ -284,15 +278,11 @@ public class ReadClickhouseData { } public static String getVertexSubscriberSql() { - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id"; } public static String getRelationshipSubsciberLocateIpSql() { - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id,radius_framed_ip"; } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java index 9b26fdc..a4541f9 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java @@ -8,11 +8,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import static cn.ac.iie.service.ingestion.ReadClickhouseData.RECENT_COUNT_HOUR; +import static cn.ac.iie.service.ingestion.ReadClickhouseData.*; /** * @author wlh @@ -23,14 +24,14 @@ public class ReadHistoryArangoData extends Thread { private ArangoDBConnect arangoConnect; private String query; - private ConcurrentHashMap> map; + private ConcurrentHashMap> map; private Class type; private String table; private CountDownLatch countDownLatch; public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, - ConcurrentHashMap> map, + ConcurrentHashMap> map, Class type, String table, CountDownLatch countDownLatch) { @@ -55,6 +56,7 @@ public class ReadHistoryArangoData extends Thread { switch (table) { case "R_LOCATE_FQDN2IP": updateProtocolDocument(doc); + deleteDistinctClientIpByTime(doc); break; case "R_VISIT_IP2FQDN": updateProtocolDocument(doc); @@ -69,11 +71,11 @@ public class ReadHistoryArangoData extends Thread { long l = System.currentTimeMillis(); LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s)); } - }catch (Exception e){ + } catch (Exception e) { e.printStackTrace(); - }finally { + } finally { countDownLatch.countDown(); - LOG.info("本线程读取完毕,剩余线程数量:"+countDownLatch.getCount()); + LOG.info("本线程读取完毕,剩余线程数量:" + countDownLatch.getCount()); } } @@ -91,4 +93,22 @@ public class ReadHistoryArangoData extends Thread { } } + private void deleteDistinctClientIpByTime(T doc) { + ArrayList distCip = (ArrayList) doc.getAttribute("DIST_CIP"); + ArrayList distCipTs = (ArrayList) doc.getAttribute("DIST_CIP_TS"); + distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600); + Collections.sort(distCipTs); + int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600); + String[] distCipArr = new String[index]; + long[] disCipTsArr = new long[index]; + if (distCip.size() + 1 == distCipTs.size()){ + for (int i = 0; i < index; i++) { + distCipArr[i] = distCip.get(i); + disCipTsArr[i] = distCipTs.get(i); + } + } + doc.updateAttribute("DIST_CIP", distCipArr); + doc.updateAttribute("DIST_CIP_TS", disCipTsArr); + } + } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java index c8dca13..82d1a1a 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java @@ -53,12 +53,12 @@ public class LocateFqdn2Ip extends Relationship { } private void updateDistinctClientIp(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ - ArrayList distCip = (ArrayList) edgeDocument.getAttribute("DIST_CIP"); - ArrayList distCipTs = (ArrayList) edgeDocument.getAttribute("DIST_CIP_TS"); + String[] distCip = (String[]) edgeDocument.getAttribute("DIST_CIP"); + long[] distCipTs = (long[]) edgeDocument.getAttribute("DIST_CIP_TS"); HashMap distCipToTs = new HashMap<>(); - if (distCip.size() == distCipTs.size()){ - for (int i = 0;i < distCip.size();i++){ - distCipToTs.put(distCip.get(i),distCipTs.get(i)); + if (distCip.length == distCipTs.length){ + for (int i = 0;i < distCip.length;i++){ + distCipToTs.put(distCip[i],distCipTs[i]); } } Object[] distCipRecent = (Object[])newEdgeDocument.getAttribute("DIST_CIP"); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java index b47e796..79ef293 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java @@ -19,7 +19,7 @@ public class ExecutorThreadPool { private static void getThreadPool(){ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("iplearning-application-pool-%d").build(); - pool = new ThreadPoolExecutor(5, ApplicationConfig.THREAD_POOL_NUMBER, + pool = new ThreadPoolExecutor(ApplicationConfig.THREAD_POOL_NUMBER, ApplicationConfig.THREAD_POOL_NUMBER, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); } diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index e639891..c09d5d5 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -1,10 +1,11 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.182 +#arangoDB.host=192.168.40.182 +arangoDB.host=192.168.40.224 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111 -#arangoDB.DB.name=ip-learning-test -arangoDB.DB.name=ip-learning-test-0 +arangoDB.DB.name=ip-learning-test +#arangoDB.DB.name=ip-learning-test-0 arangoDB.batch=100000 arangoDB.ttl=3600 @@ -16,8 +17,8 @@ thread.await.termination.time=10 #读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围 time.limit.type=1 -read.clickhouse.max.time=1595489408 -read.clickhouse.min.time=1593878400 +read.clickhouse.max.time=1596097930 +read.clickhouse.min.time=1593676894 update.interval=3600 distinct.client.ip.num=10000 diff --git a/IP-learning-graph/src/main/resources/clickhouse.properties b/IP-learning-graph/src/main/resources/clickhouse.properties index 1a84107..3b0ddbb 100644 --- a/IP-learning-graph/src/main/resources/clickhouse.properties +++ b/IP-learning-graph/src/main/resources/clickhouse.properties @@ -1,9 +1,9 @@ drivers=ru.yandex.clickhouse.ClickHouseDriver -#db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000 -db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000 +db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000 +#db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000 mdb.user=default -#mdb.password=ceiec2019 -mdb.password=111111 +mdb.password=ceiec2019 +#mdb.password=111111 initialsize=1 minidle=1 maxactive=50 diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java new file mode 100644 index 0000000..83ce03c --- /dev/null +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java @@ -0,0 +1,24 @@ +package cn.ac.iie; + +import java.util.ArrayList; +import java.util.Collections; + +public class TestList { + public static void main(String[] args) { + ArrayList integers = new ArrayList<>(); + integers.add(10); + integers.add(8); + integers.add(11); + integers.add(5); + integers.add(5); + integers.add(4); + integers.add(4); + integers.add(12); + Collections.sort(integers); + System.out.println(integers); + integers.add(5); + Collections.sort(integers); + System.out.println(integers); + System.out.println(integers.indexOf(5)); + } +}