From 0e926aa7d0a0207eca0f9a1d9244b3619c58cf32 Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Wed, 8 Jul 2020 19:44:46 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=BD=E5=8F=96=E5=85=AC=E5=85=B1=E6=96=B9?= =?UTF-8?q?=E6=B3=95=EF=BC=8C=E9=87=8D=E6=9E=84=E4=BB=A3=E7=A0=81=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/cn/ac/iie/dao/BaseArangoData.java | 88 +++-- .../cn/ac/iie/dao/BaseClickhouseData.java | 302 ++++-------------- .../java/cn/ac/iie/dao/UpdateGraphData.java | 114 +++++++ .../java/cn/ac/iie/etl/BaseUpdateEtl.java | 156 --------- .../cn/ac/iie/etl/fqdn/ArangoVFqdnToMap.java | 55 ---- .../java/cn/ac/iie/etl/fqdn/UpdateVFqdn.java | 60 ---- .../fqdn2ip/ArangoEFqdnAddressIpToMap.java | 53 --- .../iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java | 57 ---- .../java/cn/ac/iie/etl/ip/ArangoVIpToMap.java | 54 ---- .../main/java/cn/ac/iie/etl/ip/UpdateVIP.java | 60 ---- .../etl/ip2fqdn/ArangoEIpVisitFqdnToMap.java | 49 --- .../iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java | 60 ---- .../ac/iie/etl/read/ReadClickhouseData.java | 189 +++++++++++ .../iie/etl/read/ReadHistoryArangoData.java | 45 +++ .../iie/etl/relationship/LocateFqdn2Ip.java | 33 ++ .../ac/iie/etl/relationship/VisitIp2Fqdn.java | 19 ++ .../cn/ac/iie/etl/update/Relationship.java | 197 ++++++++++++ .../java/cn/ac/iie/etl/update/Vertex.java | 122 +++++++ .../main/java/cn/ac/iie/etl/vertex/Fqdn.java | 22 ++ .../main/java/cn/ac/iie/etl/vertex/Ip.java | 35 ++ .../iie/test/IpLearningApplicationTest.java | 67 +--- .../ac/iie/test/ReadArangoDBThreadTest.java | 95 ------ .../cn/ac/iie/utils/ExecutorThreadPool.java | 4 +- .../src/main/resources/application.properties | 6 +- .../src/test/java/cn/ac/iie/TestMap.java | 21 ++ 25 files changed, 911 insertions(+), 1052 deletions(-) create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/ArangoVFqdnToMap.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/UpdateVFqdn.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/ArangoEFqdnAddressIpToMap.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/ArangoVIpToMap.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/UpdateVIP.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/ArangoEIpVisitFqdnToMap.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadClickhouseData.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Relationship.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Vertex.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Ip.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index bba64fa..68ee89e 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -1,10 +1,7 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.etl.fqdn2ip.ArangoEFqdnAddressIpToMap; -import cn.ac.iie.etl.ip2fqdn.ArangoEIpVisitFqdnToMap; -import cn.ac.iie.etl.fqdn.ArangoVFqdnToMap; -import cn.ac.iie.etl.ip.ArangoVIpToMap; +import cn.ac.iie.etl.read.ReadHistoryArangoData; import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.ArangoCursor; @@ -18,8 +15,8 @@ import java.util.concurrent.ConcurrentHashMap; public class BaseArangoData { private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); - public static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); - public static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); public static ConcurrentHashMap e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>(); public static ConcurrentHashMap e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>(); @@ -27,48 +24,40 @@ public class BaseArangoData { private static final ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); - public static void BaseVFqdnDataMap() { - String sql = "LET FQDN = (FOR doc IN FQDN RETURN doc) return {max_time:MAX(FQDN[*].FIRST_FOUND_TIME),min_time:MIN(FQDN[*].FIRST_FOUND_TIME)}"; - long[] timeLimit = getTimeLimit(sql); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - ArangoVFqdnToMap ArangoVFqdnToMap = new ArangoVFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i); - threadPool.executor(ArangoVFqdnToMap); + public void baseDocumentDataMap(){ + long startA = System.currentTimeMillis(); + readHistoryData("FQDN", v_Fqdn_Map); + readHistoryData("IP", v_Ip_Map); + readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map); + readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map); + threadPool.shutdown(); + threadPool.awaitThreadTask(); + LOG.info("v_Fqdn_Map大小:"+BaseArangoData.v_Fqdn_Map.size()); + LOG.info("v_Ip_Map大小:"+BaseArangoData.v_Ip_Map.size()); + LOG.info("e_Fqdn_Address_Ip_Map大小:"+BaseArangoData.e_Fqdn_Address_Ip_Map.size()); + LOG.info("e_Ip_Visit_Fqdn_Map大小:"+BaseArangoData.e_Ip_Visit_Fqdn_Map.size()); + long lastA = System.currentTimeMillis(); + LOG.info("读取ArangoDb时间:"+(lastA - startA)); + } + + private void readHistoryData(String table, ConcurrentHashMap map){ + try { + long[] timeRange = getTimeRange(table); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + String sql = getQuerySql(timeRange, i, table); + ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData(arangoDBConnect, sql, map); + threadPool.executor(readHistoryArangoData); + } + }catch (Exception e){ + e.printStackTrace(); } } - public static void BaseVIpDataMap() { - String sql = "LET IP = (FOR doc IN IP RETURN doc) return {max_time:MAX(IP[*].FIRST_FOUND_TIME),min_time:MIN(IP[*].FIRST_FOUND_TIME)}"; - long[] timeLimit = getTimeLimit(sql); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - ArangoVIpToMap arangoVIpToMap = new ArangoVIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i); - threadPool.executor(arangoVIpToMap); - } - } - - public static void BaseEFqdnAddressIpDataMap(){ - String sql = "LET e = (FOR doc IN R_LOCATE_FQDN2IP RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}"; - long[] timeLimit = getTimeLimit(sql); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ - ArangoEFqdnAddressIpToMap arangoEFqdnAddressIpToMap = new ArangoEFqdnAddressIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i); - threadPool.executor(arangoEFqdnAddressIpToMap); - } - } - - public static void BaseEIpVisitFqdnDataMap(){ - String sql = "LET e = (FOR doc IN R_VISIT_IP2FQDN RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}"; - long[] timeLimit = getTimeLimit(sql); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ - ArangoEIpVisitFqdnToMap arangoEIpVisitFqdnToMap = new ArangoEIpVisitFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i); - threadPool.executor(arangoEIpVisitFqdnToMap); - } - } - - private static long[] getTimeLimit(String sql) { + private long[] getTimeRange(String table){ long minTime = 0L; long maxTime = 0L; - long diffTime = 0L; long startTime = System.currentTimeMillis(); -// LOG.info(sql); + String sql = "LET doc = (FOR doc IN "+table+" RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}"; ArangoCursor timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class); try { if (timeDoc != null){ @@ -79,14 +68,23 @@ public class BaseArangoData { } long lastTime = System.currentTimeMillis(); LOG.info(sql+"\n查询最大最小时间用时:" + (lastTime - startTime)); - diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; }else { LOG.warn("获取ArangoDb时间范围为空"); } }catch (Exception e){ - LOG.error(e.toString()); + e.printStackTrace(); } - return new long[]{minTime, maxTime, diffTime}; + return new long[]{minTime, maxTime}; + + } + + private String getQuerySql(long[] timeRange,int threadNumber,String table){ + long minTime = timeRange[0]; + long maxTime = timeRange[1]; + long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; + long maxThreadTime = minTime + (threadNumber + 1)* diffTime; + long minThreadTime = minTime + threadNumber * diffTime; + return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index 6c358e7..a08679d 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -1,10 +1,6 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.etl.fqdn2ip.UpdateEFqdnAddressIp; -import cn.ac.iie.etl.ip2fqdn.UpdateEIpVisitFqdn; -import cn.ac.iie.etl.fqdn.UpdateVFqdn; -import cn.ac.iie.etl.ip.UpdateVIP; import cn.ac.iie.utils.ClickhouseConnect; import com.alibaba.druid.pool.DruidPooledConnection; import com.arangodb.entity.BaseDocument; @@ -16,313 +12,133 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; -import java.util.regex.Pattern; +import java.util.Map; + +import static cn.ac.iie.etl.read.ReadClickhouseData.*; public class BaseClickhouseData { private static final Logger LOG = LoggerFactory.getLogger(BaseClickhouseData.class); private static final ClickhouseConnect manger = ClickhouseConnect.getInstance(); - private static HashMap> vFqdnMap = new HashMap<>(); - private static HashMap> vIpMap = new HashMap<>(); - private static HashMap>> eFqdnAddressIpMap = new HashMap<>(); - private static HashMap>> eIpVisitFqdnMap = new HashMap<>(); + static HashMap>> vFqdnMap = new HashMap<>(); + static HashMap>> vIpMap = new HashMap<>(); + static HashMap>> eFqdnAddressIpMap = new HashMap<>(); + static HashMap>> eIpVisitFqdnMap = new HashMap<>(); - private static Pattern pattern = Pattern.compile("^[\\d]*$"); + private DruidPooledConnection connection; + private Statement statement; - private static long[] getTimeLimit() { - long maxTime = System.currentTimeMillis() / 1000; - long minTime = maxTime - 3600; -// long maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; -// long minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; - return new long[]{maxTime, minTime}; - } - - static { - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - vFqdnMap.put(i, new ArrayList<>()); - } + public void BaseVFqdn() { + initializeVertexMap(vFqdnMap); LOG.info("V_FQDN resultMap初始化完成"); - - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - vIpMap.put(i, new ArrayList<>()); - } - LOG.info("V_IP resultMap初始化完成"); - - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - eFqdnAddressIpMap.put(i, new HashMap<>()); - } - LOG.info("E_ADDRESS_V_FQDN_TO_V_IP resultMap初始化完成"); - - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - eIpVisitFqdnMap.put(i, new HashMap<>()); - } - LOG.info("E_VISIT_V_IP_TO_V_FQDN resultMap初始化完成"); - } - - public static void BaseVFqdn() { - String sql = getVFqdnSql(); long start = System.currentTimeMillis(); try { - DruidPooledConnection connection = manger.getConnection(); - Statement statement = connection.createStatement(); + connection = manger.getConnection(); + statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { - String fqdnName = resultSet.getString("FQDN"); - if (isDomain(fqdnName)){ - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - BaseDocument newDoc = new BaseDocument(); - newDoc.setKey(fqdnName); - newDoc.addAttribute("FQDN_NAME", fqdnName); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + BaseDocument newDoc = getVertexFqdnDocument(resultSet); + if (newDoc != null) { + String fqdnName = newDoc.getKey(); int i = Math.abs(fqdnName.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - ArrayList documentList = vFqdnMap.getOrDefault(i, new ArrayList<>()); - documentList.add(newDoc); + HashMap> documentHashMap = vFqdnMap.getOrDefault(i, new HashMap<>()); + ArrayList documentArrayList = documentHashMap.getOrDefault(fqdnName, new ArrayList<>()); + documentArrayList.add(newDoc); + documentHashMap.put(fqdnName,documentArrayList); } } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse v_FQDN时间:" + (last - start)); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - ArrayList baseDocumentList = vFqdnMap.get(i); - LOG.info("vFqdn baseDocumentHashMap大小:" + baseDocumentList.size()); - UpdateVFqdn updateVFqdn = new UpdateVFqdn(baseDocumentList); - updateVFqdn.run(); - } } catch (Exception e) { LOG.error(e.toString()); + }finally { + manger.clear(statement,connection); } } - public static void BaseVIp() { + public void BaseVIp() { + initializeVertexMap(vIpMap); + LOG.info("V_IP resultMap初始化完成"); String sql = getVIpSql(); long start = System.currentTimeMillis(); try { - DruidPooledConnection connection = manger.getConnection(); - Statement statement = connection.createStatement(); + connection = manger.getConnection(); + statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { - String ip = resultSet.getString("IP"); - String location = resultSet.getString("location"); - String[] locationSplit = location.split(";"); - String ipLocationNation; - String ipLocationRegion; - if (locationSplit.length == 3) { - ipLocationNation = locationSplit[0]; - ipLocationRegion = locationSplit[1]; - } else { - ipLocationNation = location; - ipLocationRegion = location; - } - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - BaseDocument newDoc = new BaseDocument(); - newDoc.setKey(ip); - newDoc.addAttribute("IP", ip); - newDoc.addAttribute("IP_LOCATION_NATION", ipLocationNation); - newDoc.addAttribute("IP_LOCATION_REGION", ipLocationRegion); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + BaseDocument newDoc = getVertexIpDocument(resultSet); + String ip = newDoc.getKey(); int i = Math.abs(ip.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - ArrayList documentList = vIpMap.getOrDefault(i, new ArrayList<>()); - documentList.add(newDoc); + HashMap> documentHashMap = vIpMap.getOrDefault(i, new HashMap<>()); + ArrayList documentArrayList = documentHashMap.getOrDefault(ip, new ArrayList<>()); + documentArrayList.add(newDoc); + documentHashMap.put(ip,documentArrayList); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse v_IP时间:" + (last - start)); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - ArrayList baseDocumentList = vIpMap.get(i); - LOG.info("vIp baseDocumentHashMap大小:" + baseDocumentList.size()); - UpdateVIP updateVIp = new UpdateVIP(baseDocumentList); - updateVIp.run(); - } } catch (Exception e) { LOG.error(e.toString()); + }finally { + manger.clear(statement,connection); } } - public static void BaseEFqdnAddressIp() { - + public void BaseEFqdnAddressIp() { + initializeVertexMap(eFqdnAddressIpMap); + LOG.info("E_ADDRESS_V_FQDN_TO_V_IP resultMap初始化完成"); String sql = getEFqdnAddressIpSql(); long start = System.currentTimeMillis(); try { - DruidPooledConnection connection = manger.getConnection(); - Statement statement = connection.createStatement(); + connection = manger.getConnection(); + statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { - String commonSchemaType = resultSet.getString("common_schema_type"); - String vFqdn = resultSet.getString("FQDN"); - if (isDomain(vFqdn)){ - String vIp = resultSet.getString("common_server_ip"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); - - String key = vFqdn + "-" + vIp; - BaseEdgeDocument newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("FQDN/" + vFqdn); - newDoc.setTo("IP/" + vIp); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL", countTotal); - newDoc.addAttribute("DIST_CIP_RECENT", distCipRecents); - newDoc.addAttribute("DIST_CIP_TOTAL", distCipRecents); - - int hashMod = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap> documentHashMap = eFqdnAddressIpMap.getOrDefault(hashMod, new HashMap()); - - HashMap schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>()); - schemaHashMap.put(commonSchemaType, newDoc); - documentHashMap.put(key, schemaHashMap); - } + BaseEdgeDocument newDoc = getRelationFqdnAddressIpDocument(resultSet); + putMapByHashcode(resultSet, newDoc, eFqdnAddressIpMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start)); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> baseDocumentHashMap = eFqdnAddressIpMap.get(i); - LOG.info("EFqdnAddressIp baseDocumentHashMap大小:" + baseDocumentHashMap.size()); - UpdateEFqdnAddressIp updateEFqdnAddressIp = new UpdateEFqdnAddressIp(baseDocumentHashMap); - updateEFqdnAddressIp.run(); - } } catch (Exception e) { LOG.error(e.toString()); + }finally { + manger.clear(statement,connection); } } - public static void BaseEIpVisitFqdn() { + public void BaseEIpVisitFqdn() { + initializeVertexMap(eIpVisitFqdnMap); + LOG.info("E_VISIT_V_IP_TO_V_FQDN resultMap初始化完成"); String sql = getEIpVisitFqdnSql(); long start = System.currentTimeMillis(); try { - DruidPooledConnection connection = manger.getConnection(); - Statement statement = connection.createStatement(); + connection = manger.getConnection(); + statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()) { - String commonSchemaType = resultSet.getString("common_schema_type"); - String vIp = resultSet.getString("common_client_ip"); - String vFqdn = resultSet.getString("FQDN"); - if (isDomain(vFqdn)){ - String key = vIp + "-" + vFqdn; - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - - BaseEdgeDocument newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("IP/" + vIp); - newDoc.setTo("FQDN/" + vFqdn); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL", countTotal); - int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap> documentHashMap = eIpVisitFqdnMap.getOrDefault(i, new HashMap()); - - HashMap schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>()); - schemaHashMap.put(commonSchemaType, newDoc); - documentHashMap.put(key, schemaHashMap); - } + BaseEdgeDocument newDoc = getRelationIpVisitFqdnDocument(resultSet); + putMapByHashcode(resultSet, newDoc, eIpVisitFqdnMap); } long last = System.currentTimeMillis(); LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> baseDocumentHashMap = eIpVisitFqdnMap.get(i); - LOG.info("EIpVisitFqdn baseDocumentHashMap大小:" + baseDocumentHashMap.size()); - UpdateEIpVisitFqdn updateEIpVisitFqdn = new UpdateEIpVisitFqdn(baseDocumentHashMap); - updateEIpVisitFqdn.run(); - } } catch (Exception e) { LOG.error(e.toString()); + }finally { + manger.clear(statement,connection); } } - private static boolean isDomain(String fqdn) { + private void initializeVertexMap(Map map){ try { - String[] fqdnArr = fqdn.split("\\."); - if (fqdnArr.length < 4 || fqdnArr.length > 4) { - return true; + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + map.put(i, new HashMap<>()); } - - for (String f : fqdnArr) { - if (pattern.matcher(f).matches()) { - int i = Integer.parseInt(f); - if (i < 0 || i > 255) { - return true; - } - } else { - return true; - } - } - } catch (Exception e) { - LOG.error("解析域名 " + fqdn + " 失败:\n" + e.toString()); + }catch (Exception e){ + e.printStackTrace(); + LOG.error("初始化数据失败"); } - return false; } - private static String getVFqdnSql() { - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = "common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime; - String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni"; - String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host"; - return "SELECT FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME FROM ((" + sslSql + ") UNION ALL (" + httpSql + ")) GROUP BY FQDN HAVING FQDN != ''"; - } - - private static String getVIpSql() { - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime + " AND (common_schema_type = 'HTTP' or common_schema_type = 'SSL')"; - String clientIpSql = "SELECT common_client_ip AS IP, common_client_location AS location, common_recv_time FROM tsg_galaxy_v3.connection_record_log where " + where; - String serverIpSql = "SELECT common_server_ip AS IP, common_server_location AS location, common_recv_time FROM tsg_galaxy_v3.connection_record_log where " + where; - return "SELECT IP,location,MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,COUNT(*) AS IP_COUNT_TOTAL FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + ")) GROUP BY IP,location"; - } - - private static String getEFqdnAddressIpSql() { - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime; - String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(30)(common_client_ip) AS DIST_CIP_RECENT,'SSL' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; - String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(30)(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; - return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; - } - - private static String getEIpVisitFqdnSql() { - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime; - String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip"; - String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'SSL' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip"; - return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; - } - - @Deprecated - private static String commonSchemaGetFqdn(String commonSchemaType, ResultSet resultSet) { - String vFqdn = ""; - try { - switch (commonSchemaType) { - case "HTTP": - vFqdn = resultSet.getString("http_host"); - break; - case "SSL": - vFqdn = resultSet.getString("ssl_sni"); - break; - default: - LOG.warn("不支持该类型common_schema_type:" + commonSchemaType); - } - } catch (Exception e) { - LOG.error(e.getMessage()); - } - if (isDomain(vFqdn)) { - return vFqdn; - } - return ""; - } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java new file mode 100644 index 0000000..c38569e --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -0,0 +1,114 @@ +package cn.ac.iie.dao; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.etl.relationship.LocateFqdn2Ip; +import cn.ac.iie.etl.relationship.VisitIp2Fqdn; +import cn.ac.iie.etl.vertex.Fqdn; +import cn.ac.iie.etl.vertex.Ip; +import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ExecutorThreadPool; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; + +public class UpdateGraphData { + private static final Logger LOG = LoggerFactory.getLogger(UpdateGraphData.class); + private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance(); + private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + private CountDownLatch countDownLatch; + + public void updateArango(){ + long startC = System.currentTimeMillis(); + try { + BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); + baseClickhouseData.BaseVFqdn(); + updateVertexFqdn(); + + baseClickhouseData.BaseVIp(); + updateVertexIp(); + + baseClickhouseData.BaseEFqdnAddressIp(); + updateRelationFqdnAddressIp(); + + baseClickhouseData.BaseEIpVisitFqdn(); + updateRelationIpVisitFqdn(); + }catch (Exception e){ + e.printStackTrace(); + }finally { + ArangoDBConnect.clean(); + } + long lastC = System.currentTimeMillis(); + LOG.info("更新ArangoDb时间:"+(lastC - startC)); + } + + private void updateVertexFqdn(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> stringArrayListHashMap = BaseClickhouseData.vFqdnMap.get(i); + LOG.info("vFqdn baseDocumentHashMap大小:" + stringArrayListHashMap.size()); + Fqdn updateVFqdn = new Fqdn(stringArrayListHashMap, arangoManger, "FQDN", BaseArangoData.v_Fqdn_Map,countDownLatch); + updateVFqdn.run(); + } + countDownLatch.await(); + LOG.info("---------FQDN vertex 更新完毕---------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + + private void updateVertexIp(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> stringArrayListHashMap = BaseClickhouseData.vIpMap.get(i); + LOG.info("vIp baseDocumentHashMap大小:" + stringArrayListHashMap.size()); + Ip updateVIp = new Ip(stringArrayListHashMap, arangoManger, "IP", BaseArangoData.v_Ip_Map, countDownLatch); + updateVIp.run(); + } + countDownLatch.await(); + LOG.info("----------IP vertex 更新完毕-------------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + + private void updateRelationFqdnAddressIp(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> baseDocumentHashMap = BaseClickhouseData.eFqdnAddressIpMap.get(i); + LOG.info("EFqdnAddressIp baseDocumentHashMap大小:" + baseDocumentHashMap.size()); + LocateFqdn2Ip updateEFqdnAddressIp = new LocateFqdn2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_FQDN2IP", BaseArangoData.e_Fqdn_Address_Ip_Map, countDownLatch); + updateEFqdnAddressIp.run(); + } + countDownLatch.await(); + LOG.info("------------R_LOCATE_FQDN2IP relationship 更新完毕----------------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + + private void updateRelationIpVisitFqdn(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> baseDocumentHashMap = BaseClickhouseData.eIpVisitFqdnMap.get(i); + LOG.info("EIpVisitFqdn baseDocumentHashMap大小:" + baseDocumentHashMap.size()); + VisitIp2Fqdn updateEIpVisitFqdn = new VisitIp2Fqdn(baseDocumentHashMap,arangoManger,"R_VISIT_IP2FQDN",BaseArangoData.e_Ip_Visit_Fqdn_Map,countDownLatch); + updateEIpVisitFqdn.run(); + } + countDownLatch.await(); + LOG.info("---------------R_VISIT_IP2FQDN ralationship 更新完毕----------------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java deleted file mode 100644 index 56ae5a2..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/BaseUpdateEtl.java +++ /dev/null @@ -1,156 +0,0 @@ -package cn.ac.iie.etl; - -import com.arangodb.entity.BaseEdgeDocument; - -import java.util.*; - -public class BaseUpdateEtl { - - public static BaseEdgeDocument mergeFqdn2IpBySchema(HashMap newEdgeDocumentSchemaMap){ - - BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument(); - Set schemaSets = newEdgeDocumentSchemaMap.keySet(); - Map properties = newBaseEdgeDocument.getProperties(); - - for (String schema : schemaSets) { - BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentSchemaMap.get(schema); - if (!properties.isEmpty()){ - setFoundTime(properties,schemaEdgeDoc); - setDistinctClientIpBySchema(properties,schemaEdgeDoc); - }else { - newBaseEdgeDocument = schemaEdgeDoc; - properties = schemaEdgeDoc.getProperties(); - } - setSchemaCnt(schema,schemaEdgeDoc,properties); - } - properties.remove("COUNT_TOTAL"); - addSchemaProperty(properties); - - newBaseEdgeDocument.setProperties(properties); - return newBaseEdgeDocument; - } - - public static BaseEdgeDocument mergeIp2FqdnBySchema(HashMap newEdgeDocumentMap){ - BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument(); - Set schemaSets = newEdgeDocumentMap.keySet(); - Map properties = newBaseEdgeDocument.getProperties(); - - for (String schema : schemaSets) { - BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentMap.get(schema); - if (!properties.isEmpty()){ - setFoundTime(properties,schemaEdgeDoc); - }else { - newBaseEdgeDocument = schemaEdgeDoc; - properties = schemaEdgeDoc.getProperties(); - } - setSchemaCnt(schema,schemaEdgeDoc,properties); - } - properties.remove("COUNT_TOTAL"); - addSchemaProperty(properties); - - newBaseEdgeDocument.setProperties(properties); - return newBaseEdgeDocument; - } - - public static void mergeIp2FqdnByHistory(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ - updateCommonProperty(newEdgeDocument,edgeDocument); - } - - public static void mergeFqdn2IpByHistory(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ - updateCommonProperty(newEdgeDocument,edgeDocument); - setDistinctClientIpByHistory(newEdgeDocument,edgeDocument); - } - - private static void addSchemaProperty(Map properties){ - if (!properties.containsKey("TLS_CNT_TOTAL")){ - properties.put("TLS_CNT_TOTAL",0L); - properties.put("TLS_CNT_RECENT",new long[7]); - }else if (!properties.containsKey("HTTP_CNT_TOTAL")){ - properties.put("HTTP_CNT_TOTAL",0L); - properties.put("HTTP_CNT_RECENT",new long[7]); - } - } - - private static void setDistinctClientIpByHistory(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ - ArrayList distCipTotal = (ArrayList) edgeDocument.getAttribute("DIST_CIP_TOTAL"); - String[] distCipTotalsSrc = distCipTotal.toArray(new String[distCipTotal.size()]); - - Object[] distCipRecentsSrc = (Object[])newEdgeDocument.getAttribute("DIST_CIP_RECENT"); - if (distCipTotalsSrc.length == 30) { - Object[] distCipTotals = mergeClientIp(distCipTotalsSrc, distCipRecentsSrc); - edgeDocument.addAttribute("DIST_CIP_TOTAL", distCipTotals); - } - edgeDocument.addAttribute("DIST_CIP_RECENT", distCipRecentsSrc); - } - - private static void updateCommonProperty(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ - Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); - edgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); - - setSchemaCntByHistory(edgeDocument,"TLS_CNT_RECENT","TLS_CNT_TOTAL",newEdgeDocument); - setSchemaCntByHistory(edgeDocument,"HTTP_CNT_RECENT","HTTP_CNT_TOTAL",newEdgeDocument); - - } - - private static void setSchemaCntByHistory(BaseEdgeDocument edgeDocument,String schema,String totalSchema,BaseEdgeDocument newEdgeDocument){ - long countTotal = Long.parseLong(newEdgeDocument.getAttribute(totalSchema).toString()); - long updateCountTotal = Long.parseLong(edgeDocument.getAttribute(totalSchema).toString()); - - ArrayList cntRecent = (ArrayList) edgeDocument.getAttribute(schema); - Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); - Long[] cntRecentsDst = new Long[7]; - System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); - cntRecentsDst[0] = countTotal; - - edgeDocument.addAttribute(schema, cntRecentsDst); - edgeDocument.addAttribute(totalSchema, countTotal + updateCountTotal); - } - - private static Object[] mergeClientIp(Object[] distCipTotalsSrc,Object[] distCipRecentsSrc){ - HashSet dIpSet = new HashSet<>(); - dIpSet.addAll(Arrays.asList(distCipRecentsSrc)); - dIpSet.addAll(Arrays.asList(distCipTotalsSrc)); - Object[] distCipTotals = dIpSet.toArray(); - if (distCipTotals.length > 30) { - System.arraycopy(distCipTotals, 0, distCipTotals, 0, 30); - } - return distCipTotals; - } - - private static void setDistinctClientIpBySchema(Map properties,BaseEdgeDocument schemaEdgeDoc){ - String[] schemaDistCipRecents = (String[]) schemaEdgeDoc.getAttribute("DIST_CIP_RECENT"); - String[] distCipRecents = (String[]) properties.get("DIST_CIP_RECENT"); - Object[] mergeClientIp = mergeClientIp(schemaDistCipRecents, distCipRecents); - properties.put("DIST_CIP_RECENT", mergeClientIp); - properties.put("DIST_CIP_TOTAL",mergeClientIp); - } - - private static void setFoundTime(Map properties,BaseEdgeDocument schemaEdgeDoc){ - long schemaFirstFoundTime = Long.parseLong(schemaEdgeDoc.getAttribute("FIRST_FOUND_TIME").toString()); - long firstFoundTime = Long.parseLong(properties.get("FIRST_FOUND_TIME").toString()); - properties.put("FIRST_FOUND_TIME",schemaFirstFoundTimelastFoundTime?schemaLastFoundTime:lastFoundTime); - } - - private static void setSchemaCnt(String schema,BaseEdgeDocument schemaEdgeDoc,Map properties){ - switch (schema) { - case "HTTP": - long httpCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); - properties.put("HTTP_CNT_TOTAL", httpCntTotal); - long[] httpCntRecentsDst = new long[7]; - httpCntRecentsDst[0] = httpCntTotal; - properties.put("HTTP_CNT_RECENT", httpCntRecentsDst); - break; - case "SSL": - long tlsCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); - properties.put("TLS_CNT_TOTAL", tlsCntTotal); - long[] tlsCntRecentsDst = new long[7]; - tlsCntRecentsDst[0] = tlsCntTotal; - properties.put("TLS_CNT_RECENT", tlsCntRecentsDst); - break; - } - } - -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/ArangoVFqdnToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/ArangoVFqdnToMap.java deleted file mode 100644 index be80b5d..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/ArangoVFqdnToMap.java +++ /dev/null @@ -1,55 +0,0 @@ -package cn.ac.iie.etl.fqdn; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class ArangoVFqdnToMap implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(ArangoVFqdnToMap.class); - - private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; - - public ArangoVFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { - this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; - } - - public void run() { - String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1)* diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN FQDN filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; -// LOG.info(name+":"+query); - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); - - try { - if (docs != null){ - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (BaseDocument doc:baseDocuments){ - String key = doc.getKey(); - BaseArangoData.v_Fqdn_Map.put(key,doc); - i++; - } -// LOG.info(name+":共处理FQDN数据"+ i); - long l = System.currentTimeMillis(); - LOG.info(query+"\n处理FQDN数据"+ i+"条,运行时间:"+(l-s)); - }else { - LOG.warn("获取VFqdn异常,结果为空"); - } - }catch (Exception e){ - LOG.error(e.getMessage()); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/UpdateVFqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/UpdateVFqdn.java deleted file mode 100644 index 0da523d..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/UpdateVFqdn.java +++ /dev/null @@ -1,60 +0,0 @@ -package cn.ac.iie.etl.fqdn; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; - -public class UpdateVFqdn implements Runnable{ - private static final Logger LOG = LoggerFactory.getLogger(UpdateVFqdn.class); - - private ArrayList documentList; - - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateVFqdn(ArrayList documentList) { - this.documentList = documentList; - } - - @Override - public void run() { - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - try { - for (BaseDocument newDocument:documentList){ - String key = newDocument.getKey(); - if (!key.equals("")){ - i += 1; - BaseDocument document = BaseArangoData.v_Fqdn_Map.getOrDefault(key, null); - if (document != null){ - Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); - document.addAttribute("LAST_FOUND_TIME",lastFoundTime); -// docUpdate.add(document); - docInsert.add(document); - }else { - docInsert.add(newDocument); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ -// arangoManger.insertAndUpdate(docInsert,docUpdate,"FQDN"); - arangoManger.overwrite(docInsert,"FQDN"); - LOG.info("更新FQDN:"+i); - i = 0; - } - } - } - if (i != 0){ -// arangoManger.insertAndUpdate(docInsert,docUpdate,"FQDN"); - arangoManger.overwrite(docInsert,"FQDN"); - LOG.info("更新FQDN:"+i); - } - }catch (Exception e){ - LOG.error(e.getMessage()); - } - - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/ArangoEFqdnAddressIpToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/ArangoEFqdnAddressIpToMap.java deleted file mode 100644 index e1f0010..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/ArangoEFqdnAddressIpToMap.java +++ /dev/null @@ -1,53 +0,0 @@ -package cn.ac.iie.etl.fqdn2ip; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class ArangoEFqdnAddressIpToMap implements Runnable{ - private static final Logger LOG = LoggerFactory.getLogger(ArangoEFqdnAddressIpToMap.class); - - private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; - - public ArangoEFqdnAddressIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { - this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; - } - - public void run() { - String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN R_LOCATE_FQDN2IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; -// LOG.info(name + ":" + query); - long s = System.currentTimeMillis(); - try { - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); - if (docs != null){ - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (BaseEdgeDocument doc : baseDocuments) { - String key = doc.getKey(); - BaseArangoData.e_Fqdn_Address_Ip_Map.put(key, doc); - i++; - } - long l = System.currentTimeMillis(); - LOG.info(query+ "\n处理R_LOCATE_FQDN2IP数据" + i + "条,运行时间:" + (l - s)); - }else { - LOG.warn("查询R_LOCATE_FQDN2IP异常,结果为空"); - } - }catch (Exception e){ - LOG.error(e.getMessage()); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java deleted file mode 100644 index 0efdb72..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java +++ /dev/null @@ -1,57 +0,0 @@ -package cn.ac.iie.etl.fqdn2ip; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.etl.BaseUpdateEtl; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -public class UpdateEFqdnAddressIp implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(UpdateEFqdnAddressIp.class); - private HashMap> documentHashMap; - - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateEFqdnAddressIp(HashMap> documentHashMap) { - this.documentHashMap = documentHashMap; - } - - @Override - public void run() { - Set keySet = documentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - int i = 0; - try { - for (String key : keySet) { - HashMap newEdgeDocumentSchmeaMap = documentHashMap.getOrDefault(key, null); - if (newEdgeDocumentSchmeaMap != null) { - BaseEdgeDocument newEdgeDocument = BaseUpdateEtl.mergeFqdn2IpBySchema(newEdgeDocumentSchmeaMap); - i += 1; - BaseEdgeDocument edgeDocument = BaseArangoData.e_Fqdn_Address_Ip_Map.getOrDefault(key, null); - if (edgeDocument != null) { - BaseUpdateEtl.mergeFqdn2IpByHistory(newEdgeDocument,edgeDocument); - docInsert.add(edgeDocument); - } else { - docInsert.add(newEdgeDocument); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { - arangoManger.overwrite(docInsert, "R_LOCATE_FQDN2IP"); - LOG.info("更新R_LOCATE_FQDN2IP:" + i); - i = 0; - } - } - } - if (i != 0) { - arangoManger.overwrite(docInsert, "R_LOCATE_FQDN2IP"); - LOG.info("更新R_LOCATE_FQDN2IP:" + i); - } - } catch (Exception e) { - e.printStackTrace(); - LOG.error(e.toString()); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/ArangoVIpToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/ArangoVIpToMap.java deleted file mode 100644 index d7423f5..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/ArangoVIpToMap.java +++ /dev/null @@ -1,54 +0,0 @@ -package cn.ac.iie.etl.ip; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class ArangoVIpToMap implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(ArangoVIpToMap.class); - - private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; - - public ArangoVIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { - this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; - } - - public void run() { - String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; -// LOG.info(name + ":" + query); - long s = System.currentTimeMillis(); - try { - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); - if (docs != null){ - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (BaseDocument doc : baseDocuments) { - String key = doc.getKey(); - BaseArangoData.v_Ip_Map.put(key, doc); - i++; - } - long l = System.currentTimeMillis(); - LOG.info(query+ "\n处理IP数据" + i + "条,运行时间:" + (l - s)); - }else { - LOG.warn("获取VIP异常,结果为空"); - } - }catch (Exception e){ - LOG.error(e.getMessage()); - } - } - -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/UpdateVIP.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/UpdateVIP.java deleted file mode 100644 index d36876b..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/UpdateVIP.java +++ /dev/null @@ -1,60 +0,0 @@ -package cn.ac.iie.etl.ip; - - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; - -public class UpdateVIP implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(UpdateVIP.class); - - private ArrayList documentList; - - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateVIP(ArrayList documentList) { - this.documentList = documentList; - } - - @Override - public void run() { - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - try { - for (BaseDocument newDocument:documentList){ - String key = newDocument.getKey(); - if (!key.equals("")){ - i += 1; - BaseDocument document = BaseArangoData.v_Ip_Map.getOrDefault(key, null); - if (document != null){ - Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); - document.addAttribute("LAST_FOUND_TIME",lastFoundTime); -// docUpdate.add(document); - docInsert.add(document); - }else { - docInsert.add(newDocument); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ -// arangoManger.insertAndUpdate(docInsert,docUpdate,"IP"); - arangoManger.overwrite(docInsert,"IP"); - LOG.info("更新IP:"+i); - i = 0; - } - } - } - if (i != 0){ -// arangoManger.insertAndUpdate(docInsert,docUpdate,"IP"); - arangoManger.overwrite(docInsert,"IP"); - LOG.info("更新IP:"+i); - } - }catch (Exception e){ - LOG.error(e.getMessage()); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/ArangoEIpVisitFqdnToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/ArangoEIpVisitFqdnToMap.java deleted file mode 100644 index 6014487..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/ArangoEIpVisitFqdnToMap.java +++ /dev/null @@ -1,49 +0,0 @@ -package cn.ac.iie.etl.ip2fqdn; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class ArangoEIpVisitFqdnToMap implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(ArangoEIpVisitFqdnToMap.class); - private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; - - public ArangoEIpVisitFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { - this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; - } - - public void run() { - String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN R_VISIT_IP2FQDN filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; -// LOG.info(name + ":" + query); - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); - - if (docs != null){ - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (BaseEdgeDocument doc : baseDocuments) { - String key = doc.getKey(); - BaseArangoData.e_Ip_Visit_Fqdn_Map.put(key, doc); - i++; - } - long l = System.currentTimeMillis(); - LOG.info(query+ "\n处理R_VISIT_IP2FQDN数据" + i + "条,运行时间:" + (l - s)); - }else { - LOG.warn("查询R_VISIT_IP2FQDN异常,结果为空"); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java deleted file mode 100644 index fa4cc4f..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java +++ /dev/null @@ -1,60 +0,0 @@ -package cn.ac.iie.etl.ip2fqdn; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.etl.BaseUpdateEtl; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Set; - -public class UpdateEIpVisitFqdn implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(UpdateEIpVisitFqdn.class); - private HashMap> documentHashMap; - - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateEIpVisitFqdn(HashMap> documentHashMap) { - this.documentHashMap = documentHashMap; - } - - @Override - public void run() { - Set keySet = documentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - int i = 0; - try { - for (String key : keySet) { - - HashMap newEdgeDocumentMap = documentHashMap.getOrDefault(key, null); - if (newEdgeDocumentMap != null) { - BaseEdgeDocument newEdgeDocument = BaseUpdateEtl.mergeIp2FqdnBySchema(newEdgeDocumentMap); - i += 1; - BaseEdgeDocument edgeDocument = BaseArangoData.e_Ip_Visit_Fqdn_Map.getOrDefault(key, null); - - if (edgeDocument != null) { - BaseUpdateEtl.mergeIp2FqdnByHistory(newEdgeDocument,edgeDocument); - docInsert.add(edgeDocument); - } else { - docInsert.add(newEdgeDocument); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { - arangoManger.overwrite(docInsert,"R_VISIT_IP2FQDN"); - LOG.info("更新R_VISIT_IP2FQDN:" + i); - i = 0; - } - } - } - if (i != 0) { - arangoManger.overwrite(docInsert,"R_VISIT_IP2FQDN"); - LOG.info("更新R_VISIT_IP2FQDN:" + i); - } - } catch (Exception e) { - LOG.error(e.toString()); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadClickhouseData.java new file mode 100644 index 0000000..550a2aa --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadClickhouseData.java @@ -0,0 +1,189 @@ +package cn.ac.iie.etl.read; + +import cn.ac.iie.config.ApplicationConfig; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.regex.Pattern; + +public class ReadClickhouseData { + + private static Pattern pattern = Pattern.compile("^[\\d]*$"); + private static final Logger LOG = LoggerFactory.getLogger(ReadClickhouseData.class); + + public static BaseDocument getVertexFqdnDocument(ResultSet resultSet) throws SQLException { + String fqdnName = resultSet.getString("FQDN"); + BaseDocument newDoc = null; + if (isDomain(fqdnName)) { + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + newDoc = new BaseDocument(); + newDoc.setKey(fqdnName); + newDoc.addAttribute("FQDN_NAME", fqdnName); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + } + return newDoc; + } + + public static BaseDocument getVertexIpDocument(ResultSet resultSet) throws SQLException { + String ip = resultSet.getString("IP"); + String location = resultSet.getString("location"); + String[] locationSplit = location.split(";"); + String ipLocationNation; + String ipLocationRegion; + if (locationSplit.length == 3) { + ipLocationNation = locationSplit[0]; + ipLocationRegion = locationSplit[1]; + } else { + ipLocationNation = location; + ipLocationRegion = location; + } + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + BaseDocument newDoc = new BaseDocument(); + newDoc.setKey(ip); + newDoc.addAttribute("IP", ip); + newDoc.addAttribute("IP_LOCATION_NATION", ipLocationNation); + newDoc.addAttribute("IP_LOCATION_REGION", ipLocationRegion); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + return newDoc; + } + + public static BaseEdgeDocument getRelationFqdnAddressIpDocument(ResultSet resultSet) throws SQLException { + String vFqdn = resultSet.getString("FQDN"); + BaseEdgeDocument newDoc = null; + if (isDomain(vFqdn)) { + String vIp = resultSet.getString("common_server_ip"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); + + String key = vFqdn + "-" + vIp; + newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("FQDN/" + vFqdn); + newDoc.setTo("IP/" + vIp); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL", countTotal); + newDoc.addAttribute("DIST_CIP_RECENT", distCipRecents); + newDoc.addAttribute("DIST_CIP_TOTAL", distCipRecents); + + } + return newDoc; + } + + public static BaseEdgeDocument getRelationIpVisitFqdnDocument(ResultSet resultSet) throws SQLException { + BaseEdgeDocument newDoc = null; + String vFqdn = resultSet.getString("FQDN"); + if (isDomain(vFqdn)) { + String vIp = resultSet.getString("common_client_ip"); + String key = vIp + "-" + vFqdn; + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + + newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("IP/" + vIp); + newDoc.setTo("FQDN/" + vFqdn); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL", countTotal); + } + return newDoc; + } + + public static void putMapByHashcode(ResultSet resultSet, BaseEdgeDocument newDoc, HashMap>> map) throws SQLException { + if (newDoc != null){ + String key = newDoc.getKey(); + String commonSchemaType = resultSet.getString("common_schema_type"); + int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; + HashMap> documentHashMap = map.getOrDefault(i, new HashMap()); + + HashMap schemaHashMap = documentHashMap.getOrDefault(key, new HashMap<>()); + schemaHashMap.put(commonSchemaType, newDoc); + documentHashMap.put(key, schemaHashMap); + } + } + + public static boolean isDomain(String fqdn) { + try { + String[] fqdnArr = fqdn.split("\\."); + if (fqdnArr.length < 4 || fqdnArr.length > 4) { + return true; + } + + for (String f : fqdnArr) { + if (pattern.matcher(f).matches()) { + int i = Integer.parseInt(f); + if (i < 0 || i > 255) { + return true; + } + } else { + return true; + } + } + } catch (Exception e) { + LOG.error("解析域名 " + fqdn + " 失败:\n" + e.toString()); + } + return false; + } + + public static String getVFqdnSql() { + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = "common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime; + String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni"; + String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host"; + return "SELECT FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME FROM ((" + sslSql + ") UNION ALL (" + httpSql + ")) GROUP BY FQDN HAVING FQDN != ''"; + } + + public static String getVIpSql() { + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime + " AND (common_schema_type = 'HTTP' or common_schema_type = 'SSL')"; + String clientIpSql = "SELECT common_client_ip AS IP, common_client_location AS location, common_recv_time FROM tsg_galaxy_v3.connection_record_log where " + where; + String serverIpSql = "SELECT common_server_ip AS IP, common_server_location AS location, common_recv_time FROM tsg_galaxy_v3.connection_record_log where " + where; + return "SELECT IP,location,MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,COUNT(*) AS IP_COUNT_TOTAL FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + ")) GROUP BY IP,location"; + } + + public static String getEFqdnAddressIpSql() { + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime; + String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(30)(common_client_ip) AS DIST_CIP_RECENT,'SSL' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; + String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(30)(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; + return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; + } + + public static String getEIpVisitFqdnSql() { + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime; + String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip"; + String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'SSL' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip"; + return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; + } + + private static long[] getTimeLimit() { +// long maxTime = System.currentTimeMillis() / 1000; +// long minTime = maxTime - 3600; + long maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; + long minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; + return new long[]{maxTime, minTime}; + } + +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java new file mode 100644 index 0000000..c4ca68b --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java @@ -0,0 +1,45 @@ +package cn.ac.iie.etl.read; + +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author wlh + * 多线程全量读取arangoDb历史数据,封装到map + */ +public class ReadHistoryArangoData extends Thread { + private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class); + + private ArangoDBConnect arangoDBConnect; + private String query; + private ConcurrentHashMap map; + + public ReadHistoryArangoData(ArangoDBConnect arangoDBConnect, String query, ConcurrentHashMap map) { + this.arangoDBConnect = arangoDBConnect; + this.query = query; + this.map = map; + } + + @Override + public void run() { + long s = System.currentTimeMillis(); + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseEdgeDocument doc : baseDocuments) { + String key = doc.getKey(); + map.put(key, doc); + i++; + } + long l = System.currentTimeMillis(); + LOG.info(query+ "\n处理IP数据" + i + "条,运行时间:" + (l - s)); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java new file mode 100644 index 0000000..cd92003 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java @@ -0,0 +1,33 @@ +package cn.ac.iie.etl.relationship; + +import cn.ac.iie.etl.update.Relationship; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class LocateFqdn2Ip extends Relationship { + + public LocateFqdn2Ip(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); + } + + @Override + protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc){ + super.mergeFunction(properties,schemaEdgeDoc); + super.mergeDistinctClientIp(properties,schemaEdgeDoc); + } + + @Override + protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { + super.updateFunction(newEdgeDocument, historyEdgeDocument); + super.updateDistinctClientIp(newEdgeDocument, historyEdgeDocument); + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java new file mode 100644 index 0000000..d2b66c3 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java @@ -0,0 +1,19 @@ +package cn.ac.iie.etl.relationship; + +import cn.ac.iie.etl.update.Relationship; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class VisitIp2Fqdn extends Relationship { + public VisitIp2Fqdn(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Relationship.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Relationship.java new file mode 100644 index 0000000..9b5a3bb --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Relationship.java @@ -0,0 +1,197 @@ +package cn.ac.iie.etl.update; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class Relationship extends Thread { + + protected HashMap> newDocumentHashMap; + protected ArangoDBConnect arangoManger; + protected String collectionName; + protected ConcurrentHashMap historyDocumentMap; + protected CountDownLatch countDownLatch; + + public Relationship(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + this.newDocumentHashMap = newDocumentHashMap; + this.arangoManger = arangoManger; + this.collectionName = collectionName; + this.historyDocumentMap = historyDocumentMap; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + Set keySet = newDocumentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + int i = 0; + try { + for (String key : keySet) { + HashMap newEdgeDocumentSchemaMap = newDocumentHashMap.getOrDefault(key, null); + if (newEdgeDocumentSchemaMap != null) { + BaseEdgeDocument newEdgeDocument = mergeRelationship(newEdgeDocumentSchemaMap); + i += 1; + BaseEdgeDocument historyEdgeDocument = historyDocumentMap.getOrDefault(key, null); + updateRelationship(newEdgeDocument,historyEdgeDocument,docInsert); + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { + arangoManger.overwrite(docInsert, collectionName); + System.out.println("更新"+collectionName+":" + i); + i = 0; + } + } + } + if (i != 0) { + arangoManger.overwrite(docInsert, collectionName); + System.out.println("更新"+collectionName+":" + i); + } + } catch (Exception e) { + e.printStackTrace(); + System.out.println(e.toString()); + }finally { + countDownLatch.countDown(); + } + } + + private BaseEdgeDocument mergeRelationship(HashMap newEdgeDocumentSchemaMap) { + BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument(); + Set schemaSets = newEdgeDocumentSchemaMap.keySet(); + Map properties = newBaseEdgeDocument.getProperties(); + + for (String schema : schemaSets) { + BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentSchemaMap.get(schema); + if (!properties.isEmpty()) { + mergeFunction(properties, schemaEdgeDoc); + } else { + newBaseEdgeDocument = schemaEdgeDoc; + properties = schemaEdgeDoc.getProperties(); + } + setSchemaCount(schema, schemaEdgeDoc, properties); + } + properties.remove("COUNT_TOTAL"); + checkSchemaProperty(properties); + + newBaseEdgeDocument.setProperties(properties); + return newBaseEdgeDocument; + } + + private void updateRelationship(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument,ArrayList docInsert){ + if (historyEdgeDocument != null) { + updateFunction(newEdgeDocument, historyEdgeDocument); + docInsert.add(historyEdgeDocument); + } else { + docInsert.add(newEdgeDocument); + } + } + + protected void updateFunction(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument){ + updateFoundTime(newEdgeDocument,historyEdgeDocument); + setSchemaCntByHistory(historyEdgeDocument,"TLS_CNT_RECENT","TLS_CNT_TOTAL",newEdgeDocument); + setSchemaCntByHistory(historyEdgeDocument,"HTTP_CNT_RECENT","HTTP_CNT_TOTAL",newEdgeDocument); +// updateDistinctClientIp(newEdgeDocument,historyEdgeDocument); + } + + protected void updateFoundTime(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument){ + Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); + historyEdgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); + } + + protected void setSchemaCntByHistory(BaseEdgeDocument historyEdgeDocument,String schema,String totalSchema,BaseEdgeDocument newEdgeDocument){ + long countTotal = Long.parseLong(newEdgeDocument.getAttribute(totalSchema).toString()); + long updateCountTotal = Long.parseLong(historyEdgeDocument.getAttribute(totalSchema).toString()); + + ArrayList cntRecent = (ArrayList) historyEdgeDocument.getAttribute(schema); + Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); + Long[] cntRecentsDst = new Long[7]; + System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); + cntRecentsDst[0] = countTotal; + + historyEdgeDocument.addAttribute(schema, cntRecentsDst); + historyEdgeDocument.addAttribute(totalSchema, countTotal + updateCountTotal); + } + + protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc) { + mergeFoundTime(properties, schemaEdgeDoc); +// mergeDistinctClientIp(properties,schemaEdgeDoc); + } + + protected void mergeDistinctClientIp(Map properties, BaseEdgeDocument schemaEdgeDoc){ + String[] schemaDistCipRecents = (String[]) schemaEdgeDoc.getAttribute("DIST_CIP_RECENT"); + String[] distCipRecents = (String[]) properties.get("DIST_CIP_RECENT"); + Object[] mergeClientIp = distinctIp(schemaDistCipRecents, distCipRecents); + properties.put("DIST_CIP_RECENT", mergeClientIp); + properties.put("DIST_CIP_TOTAL",mergeClientIp); + } + + protected void updateDistinctClientIp(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ + ArrayList distCipTotal = (ArrayList) edgeDocument.getAttribute("DIST_CIP_TOTAL"); + String[] distCipTotalsSrc = distCipTotal.toArray(new String[distCipTotal.size()]); + + Object[] distCipRecentsSrc = (Object[])newEdgeDocument.getAttribute("DIST_CIP_RECENT"); + if (distCipTotalsSrc.length == 30) { + Object[] distCipTotals = distinctIp(distCipTotalsSrc, distCipRecentsSrc); + edgeDocument.addAttribute("DIST_CIP_TOTAL", distCipTotals); + } + edgeDocument.addAttribute("DIST_CIP_RECENT", distCipRecentsSrc); + } + + protected Object[] distinctIp(Object[] distCipTotalsSrc,Object[] distCipRecentsSrc){ + HashSet dIpSet = new HashSet<>(); + dIpSet.addAll(Arrays.asList(distCipRecentsSrc)); + dIpSet.addAll(Arrays.asList(distCipTotalsSrc)); + Object[] distCipTotals = dIpSet.toArray(); + if (distCipTotals.length > 30) { + System.arraycopy(distCipTotals, 0, distCipTotals, 0, 30); + } + return distCipTotals; + } + + protected void mergeFoundTime(Map properties, BaseEdgeDocument schemaEdgeDoc) { + long schemaFirstFoundTime = Long.parseLong(schemaEdgeDoc.getAttribute("FIRST_FOUND_TIME").toString()); + long firstFoundTime = Long.parseLong(properties.get("FIRST_FOUND_TIME").toString()); + properties.put("FIRST_FOUND_TIME", schemaFirstFoundTime < firstFoundTime ? schemaFirstFoundTime : firstFoundTime); + long schemaLastFoundTime = Long.parseLong(schemaEdgeDoc.getAttribute("LAST_FOUND_TIME").toString()); + long lastFoundTime = Long.parseLong(properties.get("LAST_FOUND_TIME").toString()); + properties.put("LAST_FOUND_TIME", schemaLastFoundTime > lastFoundTime ? schemaLastFoundTime : lastFoundTime); + } + + protected void setSchemaCount(String schema, BaseEdgeDocument schemaEdgeDoc, Map properties) { + switch (schema) { + case "HTTP": + long httpCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); + properties.put("HTTP_CNT_TOTAL", httpCntTotal); + long[] httpCntRecentsDst = new long[7]; + httpCntRecentsDst[0] = httpCntTotal; + properties.put("HTTP_CNT_RECENT", httpCntRecentsDst); + break; + case "SSL": + long tlsCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); + properties.put("TLS_CNT_TOTAL", tlsCntTotal); + long[] tlsCntRecentsDst = new long[7]; + tlsCntRecentsDst[0] = tlsCntTotal; + properties.put("TLS_CNT_RECENT", tlsCntRecentsDst); + break; + default: + break; + } + } + + protected void checkSchemaProperty(Map properties){ + if (!properties.containsKey("TLS_CNT_TOTAL")){ + properties.put("TLS_CNT_TOTAL",0L); + properties.put("TLS_CNT_RECENT",new long[7]); + }else if (!properties.containsKey("HTTP_CNT_TOTAL")){ + properties.put("HTTP_CNT_TOTAL",0L); + properties.put("HTTP_CNT_RECENT",new long[7]); + } + } + + +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Vertex.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Vertex.java new file mode 100644 index 0000000..1b7ba73 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/update/Vertex.java @@ -0,0 +1,122 @@ +package cn.ac.iie.etl.update; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +/** + * @author wlh + * 多线程更新vertex数据 + */ +public class Vertex extends Thread{ + + protected HashMap> newDocumentHashMap; + protected ArangoDBConnect arangoManger; + protected String collectionName; + protected ConcurrentHashMap historyDocumentMap; + protected CountDownLatch countDownLatch; + + public Vertex(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch){ + this.newDocumentHashMap = newDocumentHashMap; + this.arangoManger = arangoManger; + this.collectionName = collectionName; + this.historyDocumentMap = historyDocumentMap; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + Set keySet = newDocumentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + int i = 0; + try { + for (String key:keySet){ + ArrayList documentArrayList = newDocumentHashMap.getOrDefault(key, null); + BaseDocument newDocument = mergeVertex(documentArrayList); + if (newDocument != null){ + i += 1; + BaseDocument historyDocument = historyDocumentMap.getOrDefault(key, null); + updateVertex(newDocument,historyDocument,docInsert); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + arangoManger.overwrite(docInsert,collectionName); + System.out.println("更新"+i); + i = 0; + } + } + if (i != 0){ + arangoManger.overwrite(docInsert,collectionName); + System.out.println("更新"+i); + } + }catch (Exception e){ + e.printStackTrace(); + }finally { + countDownLatch.countDown(); + } + } + + private void updateVertex(BaseDocument newDocument,BaseDocument historyDocument,ArrayList docInsert){ + if (historyDocument != null){ + updateFunction(newDocument,historyDocument); + docInsert.add(historyDocument); + }else { + docInsert.add(newDocument); + } + } + + protected void updateFunction(BaseDocument newDocument,BaseDocument historyDocument){ + updateFoundTime(newDocument,historyDocument); + } + + private void updateFoundTime(BaseDocument newDocument,BaseDocument historyDocument){ + Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); + historyDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); + } + + private BaseDocument mergeVertex(ArrayList documentArrayList){ + if (documentArrayList == null || documentArrayList.isEmpty()){ + return null; + }else if (documentArrayList.size() == 1){ + return documentArrayList.get(0); + }else { + BaseDocument document = new BaseDocument(); + Map properties = document.getProperties(); + for (BaseDocument doc:documentArrayList){ + if (properties.isEmpty()){ + document = doc; + properties = doc.getProperties(); + }else { + mergeFunction(properties,doc); + } + } + document.setProperties(properties); + return document; + } + } + + protected void mergeFunction(Map properties,BaseDocument doc){ + mergeFoundTime(properties,doc); + } + + private void mergeFoundTime(Map properties,BaseDocument doc){ + long firstFoundTime = Long.parseLong(properties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); + long docFirstFoundTime = Long.parseLong(doc.getAttribute("FIRST_FOUND_TIME").toString()); + properties.put("FIRST_FOUND_TIME",firstFoundTimedocLastFoundTime? lastFoundTime:docLastFoundTime); + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java new file mode 100644 index 0000000..426a38b --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java @@ -0,0 +1,22 @@ +package cn.ac.iie.etl.vertex; + +import cn.ac.iie.etl.update.Vertex; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class Fqdn extends Vertex { + + public Fqdn(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Ip.java new file mode 100644 index 0000000..efa37cf --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/vertex/Ip.java @@ -0,0 +1,35 @@ +package cn.ac.iie.etl.vertex; + +import cn.ac.iie.etl.update.Vertex; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class Ip extends Vertex { + + public Ip(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); + } + + @Override + protected void mergeFunction(Map properties, BaseDocument doc) { + super.mergeFunction(properties, doc); + + } + + @Override + protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) { + super.updateFunction(newDocument, historyDocument); + + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java index 86a62f1..c425839 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java @@ -1,75 +1,22 @@ package cn.ac.iie.test; import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.dao.BaseClickhouseData; -import cn.ac.iie.utils.ArangoDBConnect; -import cn.ac.iie.utils.ExecutorThreadPool; +import cn.ac.iie.dao.UpdateGraphData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CountDownLatch; public class IpLearningApplicationTest { private static final Logger LOG = LoggerFactory.getLogger(IpLearningApplicationTest.class); public static void main(String[] args) { + LOG.info("Ip Learning Application开始运行"); + BaseArangoData baseArangoData = new BaseArangoData(); + baseArangoData.baseDocumentDataMap(); - long startA = System.currentTimeMillis(); - BaseArangoData.BaseVFqdnDataMap(); - BaseArangoData.BaseVIpDataMap(); - BaseArangoData.BaseEFqdnAddressIpDataMap(); - BaseArangoData.BaseEIpVisitFqdnDataMap(); - - - ExecutorThreadPool.shutdown(); - ExecutorThreadPool.awaitThreadTask(); - long lastA = System.currentTimeMillis(); - LOG.info("读取ArangoDb时间:"+(lastA - startA)); - - long startC = System.currentTimeMillis(); - - try { - CountDownLatch countDownLatch = new CountDownLatch(4); - - new Thread(() -> { - BaseClickhouseData.BaseVFqdn(); - countDownLatch.countDown(); - }).start(); - - new Thread(() -> { - BaseClickhouseData.BaseVIp(); - countDownLatch.countDown(); - }).start(); - - new Thread(() -> { - BaseClickhouseData.BaseEFqdnAddressIp(); - countDownLatch.countDown(); - }).start(); - - new Thread(() -> { - BaseClickhouseData.BaseEIpVisitFqdn(); - countDownLatch.countDown(); - }).start(); - - try { - countDownLatch.await(); - LOG.info("主线程等待完毕"); - }catch (Exception e){ - LOG.error("主线程阻塞异常:\n"+e.toString()); - } - - long lastC = System.currentTimeMillis(); - LOG.info("更新ArangoDb时间:"+(lastC - startC)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - ArangoDBConnect.clean(); - } - - LOG.info("v_Fqdn_Map大小:"+BaseArangoData.v_Fqdn_Map.size()); - LOG.info("v_Ip_Map大小:"+BaseArangoData.v_Ip_Map.size()); - LOG.info("e_Fqdn_Address_Ip_Map大小:"+BaseArangoData.e_Fqdn_Address_Ip_Map.size()); - LOG.info("e_Ip_Visit_Fqdn_Map大小:"+BaseArangoData.e_Ip_Visit_Fqdn_Map.size()); + LOG.info("历史数据读取完成,开始更新数据"); + UpdateGraphData updateGraphData = new UpdateGraphData(); + updateGraphData.updateArango(); } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java b/IP-learning-graph/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java deleted file mode 100644 index b913e6b..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java +++ /dev/null @@ -1,95 +0,0 @@ -package cn.ac.iie.test; - -import cn.ac.iie.config.ApplicationConfig; -import com.arangodb.ArangoCursor; -import com.arangodb.ArangoDB; -import com.arangodb.ArangoDatabase; -import com.arangodb.entity.BaseDocument; -import com.arangodb.model.AqlQueryOptions; -import com.arangodb.util.MapBuilder; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -public class ReadArangoDBThreadTest { - - private static ConcurrentHashMap fqdnMap = new ConcurrentHashMap(); - public static void main(String[] args) throws Exception { - final ArangoDB arangoDB = new ArangoDB.Builder() - .maxConnections(ApplicationConfig.THREAD_POOL_NUMBER) - .host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT) - .user(ApplicationConfig.ARANGODB_USER) - .password(ApplicationConfig.ARANGODB_PASSWORD) - .build(); - Map bindVars = new MapBuilder().get(); - AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); - - String sql = "LET FQDN = (FOR doc IN V_FQDN RETURN doc) return {max_time:MAX(FQDN[*].FQDN_FIRST_FOUND_TIME),min_time:MIN(FQDN[*].FQDN_FIRST_FOUND_TIME)}"; -// String sql = "LET IP = (FOR doc IN V_IP RETURN doc) return {max_time:MAX(IP[*].FIRST_FOUND_TIME),min_time:MIN(IP[*].FIRST_FOUND_TIME)}"; - final ArangoDatabase db = arangoDB.db("insert_iplearn_index"); - long startTime = System.currentTimeMillis(); - ArangoCursor timeDoc = db.query(sql, bindVars, options, BaseDocument.class); - long maxTime =0L; - long minTime =0L; - while (timeDoc.hasNext()){ - BaseDocument doc = timeDoc.next(); - maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER; - minTime = Long.parseLong(doc.getAttribute("min_time").toString()); - } - long lastTime = System.currentTimeMillis(); - System.out.println("查询最大最小时间用时:"+(lastTime-startTime)); - System.out.println(maxTime + "--" + minTime); - final long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; - ExecutorService pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER); - - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { -// long finalMaxTime = maxTime; - final long finalMinTime = minTime; - pool.execute(new Runnable() { - - public void run() { - String name = Thread.currentThread().getName(); - ArangoDatabase insert_iplearn_index = arangoDB.db("insert_iplearn_index"); - Map bindVars = new MapBuilder().get(); - AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); - String[] split = name.split("-"); - Long threadNum = Long.parseLong(split[3]); - long maxThreadTime = finalMinTime + threadNum * diffTime; - long minThreadTime = finalMinTime + (threadNum-1)*diffTime; - String query = "FOR doc IN V_FQDN filter doc.FQDN_FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FQDN_FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; -// String query = "FOR doc IN V_IP filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; - System.out.println(name+":"+query); - long s = System.currentTimeMillis(); - ArangoCursor fqdnDoc = insert_iplearn_index.query(query, bindVars, options, BaseDocument.class); - List baseDocuments = fqdnDoc.asListRemaining(); - int i = 0; - for (BaseDocument doc:baseDocuments){ - String key = doc.getKey(); -// System.out.println(name+":"+key); - fqdnMap.put(key,doc); - i++; - } - /* - while (fqdnDoc.hasNext()){ - BaseDocument doc = fqdnDoc.next(); - } - */ - System.out.println(name+":"+ i); - long l = System.currentTimeMillis(); - System.out.println(name+"运行时间:"+(l-s)); - } - }); - } - pool.shutdown(); - while (!pool.awaitTermination(20, TimeUnit.SECONDS)){ - - } - System.out.println(fqdnMap.size()); - arangoDB.shutdown(); - - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java index 99ff051..676b887 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java @@ -32,7 +32,7 @@ public class ExecutorThreadPool { pool.execute(command); } - public static void awaitThreadTask(){ + public void awaitThreadTask(){ try { while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) { LOG.warn("线程池没有关闭"); @@ -42,7 +42,7 @@ public class ExecutorThreadPool { } } - public static void shutdown(){ + public void shutdown(){ pool.shutdown(); } diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index 753ac9e..9a1f229 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -1,5 +1,5 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.182 +arangoDB.host=192.168.40.127 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111 @@ -13,5 +13,5 @@ update.arango.batch=10000 thread.pool.number=10 thread.await.termination.time=10 -read.clickhouse.max.time=1593582211 -read.clickhouse.min.time=1592879247 \ No newline at end of file +read.clickhouse.max.time=1594194404 +read.clickhouse.min.time=1593676953 \ No newline at end of file diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java index 34a39c5..46e6c88 100644 --- a/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java @@ -106,4 +106,25 @@ public class TestMap { } + + public boolean isValid(String s) { + HashMap map = new HashMap<>(); + map.put('(',')'); + map.put('[',']'); + map.put('{','}'); + + Stack stack = new Stack(); + for (int i = 0;i < s.length();i++){ + Character c = s.charAt(i); + if (map.containsKey(c)){ + Character c1 = stack.isEmpty() ? '#' : stack.pop(); + if (map.get(c) != c1){ + return false; + } + }else { + stack.push(c); + } + } + return stack.isEmpty(); + } }