diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java b/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java index 61f7826..7e0492b 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java @@ -21,5 +21,11 @@ public class ApplicationConfig { public static final Long READ_CLICKHOUSE_MAX_TIME = ConfigUtils.getLongProperty("read.clickhouse.max.time"); public static final Long READ_CLICKHOUSE_MIN_TIME = ConfigUtils.getLongProperty("read.clickhouse.min.time"); + public static final Integer TIME_LIMIT_TYPE = ConfigUtils.getIntProperty("time.limit.type"); + public static final Integer UPDATE_INTERVAL = ConfigUtils.getIntProperty("update.interval"); + + public static final Integer DISTINCT_CLIENT_IP_NUM = ConfigUtils.getIntProperty("distinct.client.ip.num"); + public static final Integer RECENT_COUNT_HOUR = ConfigUtils.getIntProperty("recent.count.hour"); + } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index 15d918f..55d917c 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -19,20 +19,25 @@ import java.util.concurrent.CountDownLatch; public class BaseArangoData { private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); - public static ConcurrentHashMap historyVertexFqdnMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap historyVertexIpMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap historyVertexSubscriberMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyVertexFqdnMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyVertexIpMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyVertexSubscriberMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>(); private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); - void readHistoryData(String table, ConcurrentHashMap map, Class type){ + void readHistoryData(String table,ConcurrentHashMap> map,Class type){ try { + LOG.info("开始更新"+table); long start = System.currentTimeMillis(); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ + map.put(i,new ConcurrentHashMap<>()); + } CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); long[] timeRange = getTimeRange(table); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { @@ -43,7 +48,6 @@ public class BaseArangoData { countDownLatch.await(); long last = System.currentTimeMillis(); LOG.info("读取"+table+" arangoDB 共耗时:"+(last-start)); - LOG.info(table+" history Map大小为:"+map.size()); }catch (Exception e){ e.printStackTrace(); } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index fd98cab..2bfc5e3 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -12,6 +12,8 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; +import java.util.function.Function; +import java.util.function.Supplier; import static cn.ac.iie.service.ingestion.ReadClickhouseData.*; @@ -33,143 +35,31 @@ public class BaseClickhouseData { private DruidPooledConnection connection; private Statement statement; - void baseVertexFqdn() { - initializeMap(newVertexFqdnMap); - LOG.info("FQDN resultMap初始化完成"); - String sql = getVertexFqdnSql(); + void baseDocumentFromClickhouse(HashMap>> newMap, Supplier getSqlSupplier, Function formatResultFunc){ long start = System.currentTimeMillis(); + initializeMap(newMap); + String sql = getSqlSupplier.get(); try { connection = manger.getConnection(); statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); + int i = 0; while (resultSet.next()) { - BaseDocument newDoc = getVertexFqdnDocument(resultSet); + T newDoc = formatResultFunc.apply(resultSet); if (newDoc != null) { - putMapByHashcode(newDoc, newVertexFqdnMap); + i+=1; + putMapByHashcode(newDoc, newMap); } } long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse v_FQDN时间:" + (last - start)); - } catch (Exception e) { - LOG.error(e.toString()); - }finally { - manger.clear(statement,connection); - } - } - - void baseVertexIp() { - initializeMap(newVertexIpMap); - LOG.info("IP resultMap初始化完成"); - String sql = getVertexIpSql(); - long start = System.currentTimeMillis(); - try { - connection = manger.getConnection(); - statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()) { - BaseDocument newDoc = getVertexIpDocument(resultSet); - putMapByHashcode(newDoc, newVertexIpMap); - } - long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse v_IP时间:" + (last - start)); - } catch (Exception e) { - LOG.error(e.toString()); - }finally { - manger.clear(statement,connection); - } - } - - void baseVertexSubscriber(){ - initializeMap(newVertexSubscriberMap); - LOG.info("SUBSCRIBER resultMap初始化完成"); - String sql = getVertexSubscriberSql(); - long start = System.currentTimeMillis(); - try { - connection = manger.getConnection(); - statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()){ - BaseDocument newDoc = getVertexSubscriberDocument(resultSet); - putMapByHashcode(newDoc, newVertexSubscriberMap); - } - long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse v_SUBSCRIBER时间:" + (last - start)); + LOG.info(sql + "\n读取"+i+"条数据,运行时间:" + (last - start)); }catch (Exception e){ - LOG.error(sql + "\n读取clickhouse v_SUBSCRIBER失败"); e.printStackTrace(); }finally { manger.clear(statement,connection); } } - void baseRelationshipSubscriberLocateIp(){ - initializeMap(newRelationSubsciberLocateIpMap); - LOG.info("R_LOCATE_SUBSCRIBER2IP"); - String sql = getRelationshipSubsciberLocateIpSql(); - long start = System.currentTimeMillis(); - try { - connection = manger.getConnection(); - statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()){ - BaseEdgeDocument newDoc = getRelationshipSubsciberLocateIpDocument(resultSet); - putMapByHashcode(newDoc, newRelationSubsciberLocateIpMap); - } - long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse ESubsciberLocateIp时间:" + (last - start)); - }catch (Exception e){ - LOG.error(sql + "\n读取clickhouse ESubsciberLocateIp失败"); - e.printStackTrace(); - }finally { - manger.clear(statement,connection); - } - } - - void baseRelationshipFqdnAddressIp() { - initializeMap(newRelationFqdnAddressIpMap); - LOG.info("R_LOCATE_FQDN2IP resultMap初始化完成"); - String sql = getRelationshipFqdnAddressIpSql(); - long start = System.currentTimeMillis(); - try { - connection = manger.getConnection(); - statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - - while (resultSet.next()) { - BaseEdgeDocument newDoc = getRelationFqdnAddressIpDocument(resultSet); - putMapByHashcode(newDoc, newRelationFqdnAddressIpMap); - } - long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start)); - } catch (Exception e) { - LOG.error(e.toString()); - }finally { - manger.clear(statement,connection); - } - } - - void baseRelationshipIpVisitFqdn() { - initializeMap(newRelationIpVisitFqdnMap); - LOG.info("R_VISIT_IP2FQDN resultMap初始化完成"); - String sql = getRelationshipIpVisitFqdnSql(); - long start = System.currentTimeMillis(); - try { - connection = manger.getConnection(); - statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()) { - BaseEdgeDocument newDoc = getRelationIpVisitFqdnDocument(resultSet); - putMapByHashcode(newDoc, newRelationIpVisitFqdnMap); - } - long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); - } catch (Exception e) { - LOG.error(e.toString()); - }finally { - manger.clear(statement,connection); - } - } - private void initializeMap(HashMap>> map){ try { for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index b888411..dbbd81f 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -1,6 +1,7 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.service.ingestion.ReadClickhouseData; import cn.ac.iie.service.update.Document; import cn.ac.iie.service.update.relationship.LocateFqdn2Ip; import cn.ac.iie.service.update.relationship.LocateSubscriber2Ip; @@ -15,9 +16,14 @@ import com.arangodb.entity.BaseEdgeDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Constructor; +import java.sql.ResultSet; import java.util.ArrayList; import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.function.Function; +import java.util.function.Supplier; import static cn.ac.iie.dao.BaseArangoData.*; import static cn.ac.iie.dao.BaseClickhouseData.*; @@ -30,35 +36,41 @@ public class UpdateGraphData { private static final Logger LOG = LoggerFactory.getLogger(UpdateGraphData.class); private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance(); private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - + private static BaseArangoData baseArangoData = new BaseArangoData(); private static BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); - private CountDownLatch countDownLatch; + public void updateArango(){ long start = System.currentTimeMillis(); try { - BaseArangoData baseArangoData = new BaseArangoData(); - baseArangoData.readHistoryData("FQDN", historyVertexFqdnMap,BaseDocument.class); - updateVertexFqdn(); + updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", + Fqdn.class,BaseDocument.class, + ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); - baseArangoData.readHistoryData("IP", historyVertexIpMap,BaseDocument.class); - updateVertexIp(); + updateDocument(newVertexIpMap,historyVertexIpMap,"IP", + Ip.class,BaseDocument.class, + ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument); - baseArangoData.readHistoryData("SUBSCRIBER", historyVertexSubscriberMap,BaseDocument.class); - updateVertexSubscriber(); + updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", + Subscriber.class,BaseDocument.class, + ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument); - baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap,BaseEdgeDocument.class); - updateRelationFqdnAddressIp(); + updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP", + LocateFqdn2Ip.class,BaseEdgeDocument.class, + ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument); - baseArangoData.readHistoryData("R_VISIT_IP2FQDN", historyRelationIpVisitFqdnMap,BaseEdgeDocument.class); - updateRelationIpVisitFqdn(); + updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN", + VisitIp2Fqdn.class,BaseEdgeDocument.class, + ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument); + + updateDocument(newRelationSubsciberLocateIpMap,historyRelationSubsciberLocateIpMap,"R_LOCATE_SUBSCRIBER2IP", + LocateSubscriber2Ip.class,BaseEdgeDocument.class, + ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument); - baseArangoData.readHistoryData("R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap,BaseEdgeDocument.class); - updateRelationshipSubsciberLocateIp(); long last = System.currentTimeMillis(); - LOG.info("更新图数据库时间共计:"+(last - start)); + LOG.info("iplearning application运行完毕,用时:"+(last - start)); }catch (Exception e){ e.printStackTrace(); }finally { @@ -67,130 +79,46 @@ public class UpdateGraphData { } } - private void updateVertexFqdn(){ + + private void updateDocument(HashMap>> newMap, + ConcurrentHashMap> historyMap, + String collection, + Class> taskType, + Class docmentType, + Supplier getSqlSupplier, + Function formatResultFunc + ) { try { + + baseArangoData.readHistoryData(collection,historyMap,docmentType); + LOG.info(collection+" 读取clickhouse,封装结果集"); + baseClickhouseData.baseDocumentFromClickhouse(newMap, getSqlSupplier,formatResultFunc); + + LOG.info(collection+" 开始更新"); long start = System.currentTimeMillis(); - baseClickhouseData.baseVertexFqdn(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newVertexFqdnMap.get(i); - Document updateFqdn = new Fqdn(tmpMap, arangoManger, "FQDN", historyVertexFqdnMap,countDownLatch); - pool.executor(updateFqdn); + CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ + HashMap> tmpNewMap = newMap.get(i); + ConcurrentHashMap tmpHisMap = historyMap.get(i); + Constructor constructor = taskType.getConstructor( + HashMap.class, + ArangoDBConnect.class, + String.class, + ConcurrentHashMap.class, + CountDownLatch.class); + Document docTask = (Document)constructor.newInstance(tmpNewMap, arangoManger, collection, tmpHisMap, countDownLatch); + pool.executor(docTask); } countDownLatch.await(); long last = System.currentTimeMillis(); - LOG.info("FQDN vertex 更新完毕,共耗时:"+(last-start)); + LOG.info(collection+" 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); }finally { - historyVertexFqdnMap.clear(); - newVertexFqdnMap.clear(); + newMap.clear(); + historyMap.clear(); } } - private void updateVertexSubscriber(){ - try { - long start = System.currentTimeMillis(); - baseClickhouseData.baseVertexSubscriber(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newVertexSubscriberMap.get(i); - Subscriber updateSubscriber = new Subscriber(tmpMap, arangoManger, "SUBSCRIBER", historyVertexSubscriberMap,countDownLatch); - pool.executor(updateSubscriber); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("SUBSCRIBER vertex 更新完毕,共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - historyVertexSubscriberMap.clear(); - newVertexSubscriberMap.clear(); - } - } - - private void updateRelationshipSubsciberLocateIp(){ - try { - long start = System.currentTimeMillis(); - baseClickhouseData.baseRelationshipSubscriberLocateIp(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newRelationSubsciberLocateIpMap.get(i); - LocateSubscriber2Ip locateSubscriber2Ip = new LocateSubscriber2Ip(tmpMap, arangoManger, "R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap, countDownLatch); - pool.executor(locateSubscriber2Ip); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("R_LOCATE_SUBSCRIBER2IP relationship 更新完毕,共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - historyRelationSubsciberLocateIpMap.clear(); - newRelationSubsciberLocateIpMap.clear(); - } - } - - private void updateVertexIp(){ - try { - long start = System.currentTimeMillis(); - baseClickhouseData.baseVertexIp(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newVertexIpMap.get(i); - Ip updateIp = new Ip(tmpMap, arangoManger, "IP", historyVertexIpMap, countDownLatch); - pool.executor(updateIp); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("IP vertex 更新完毕,共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - historyVertexIpMap.clear(); - newVertexIpMap.clear(); - } - } - - private void updateRelationFqdnAddressIp(){ - try { - long start = System.currentTimeMillis(); - baseClickhouseData.baseRelationshipFqdnAddressIp(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newRelationFqdnAddressIpMap.get(i); - LocateFqdn2Ip fqdnAddressIp = new LocateFqdn2Ip(tmpMap, arangoManger, "R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, countDownLatch); - pool.executor(fqdnAddressIp); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("R_LOCATE_FQDN2IP relationship 更新完毕,共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - historyRelationFqdnAddressIpMap.clear(); - newRelationFqdnAddressIpMap.clear(); - } - } - - private void updateRelationIpVisitFqdn(){ - try { - long start = System.currentTimeMillis(); - baseClickhouseData.baseRelationshipIpVisitFqdn(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newRelationIpVisitFqdnMap.get(i); - VisitIp2Fqdn ipVisitFqdn = new VisitIp2Fqdn(tmpMap,arangoManger,"R_VISIT_IP2FQDN", historyRelationIpVisitFqdnMap,countDownLatch); - pool.executor(ipVisitFqdn); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("R_VISIT_IP2FQDN ralationship 更新完毕,共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - historyRelationIpVisitFqdnMap.clear(); - newRelationIpVisitFqdnMap.clear(); - } - } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java index 275491c..5700cb8 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java @@ -7,7 +7,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.ResultSet; -import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -23,142 +22,168 @@ public class ReadClickhouseData { private static Pattern pattern = Pattern.compile("^[\\d]*$"); private static final Logger LOG = LoggerFactory.getLogger(ReadClickhouseData.class); + private static long[] timeLimit = getTimeLimit(); - public static HashSet protocolSet; + public static final Integer DISTINCT_CLIENT_IP_NUM = ApplicationConfig.DISTINCT_CLIENT_IP_NUM; + public static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR; + public static final HashSet PROTOCOL_SET; static { - protocolSet = new HashSet<>(); - protocolSet.add("HTTP"); - protocolSet.add("TLS"); - protocolSet.add("DNS"); + PROTOCOL_SET = new HashSet<>(); + PROTOCOL_SET.add("HTTP"); + PROTOCOL_SET.add("TLS"); + PROTOCOL_SET.add("DNS"); } - public static BaseDocument getVertexFqdnDocument(ResultSet resultSet) throws SQLException { - String fqdnName = resultSet.getString("FQDN"); + public static BaseDocument getVertexFqdnDocument(ResultSet resultSet) { BaseDocument newDoc = null; - if (isDomain(fqdnName)) { - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - newDoc = new BaseDocument(); - newDoc.setKey(fqdnName); - newDoc.addAttribute("FQDN_NAME", fqdnName); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - } - return newDoc; - } - - public static BaseDocument getVertexIpDocument(ResultSet resultSet) throws SQLException { - BaseDocument newDoc = new BaseDocument(); - String ip = resultSet.getString("IP"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long sessionCount = resultSet.getLong("SESSION_COUNT"); - long bytesSum = resultSet.getLong("BYTES_SUM"); - String ipType = resultSet.getString("ip_type"); - newDoc.setKey(ip); - newDoc.addAttribute("IP", ip); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - switch (ipType) { - case "client": - newDoc.addAttribute("CLIENT_SESSION_COUNT", sessionCount); - newDoc.addAttribute("CLIENT_BYTES_SUM", bytesSum); - newDoc.addAttribute("SERVER_SESSION_COUNT", 0L); - newDoc.addAttribute("SERVER_BYTES_SUM", 0L); - break; - case "server": - newDoc.addAttribute("SERVER_SESSION_COUNT", sessionCount); - newDoc.addAttribute("SERVER_BYTES_SUM", bytesSum); - newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L); - newDoc.addAttribute("CLIENT_BYTES_SUM", 0L); - break; - default: - } - newDoc.addAttribute("COMMON_LINK_INFO", ""); - return newDoc; - } - - public static BaseDocument getVertexSubscriberDocument(ResultSet resultSet) throws SQLException { - String subscriberId = resultSet.getString("common_subscriber_id"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - BaseDocument newDoc = new BaseDocument(); - newDoc.setKey(subscriberId); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - return newDoc; - } - - public static BaseEdgeDocument getRelationshipSubsciberLocateIpDocument(ResultSet resultSet) throws SQLException { - String subscriberId = resultSet.getString("common_subscriber_id"); - String framedIp = resultSet.getString("radius_framed_ip"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - - String key = subscriberId + "-" + framedIp; - BaseEdgeDocument newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("SUBSCRIBER/" + subscriberId); - newDoc.setTo("IP/" + framedIp); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL", countTotal); - - return newDoc; - - } - - public static BaseEdgeDocument getRelationFqdnAddressIpDocument(ResultSet resultSet) throws SQLException { - String vFqdn = resultSet.getString("FQDN"); - BaseEdgeDocument newDoc = null; - if (isDomain(vFqdn)) { - String vIp = resultSet.getString("common_server_ip"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - String schemaType = resultSet.getString("schema_type"); - String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); - long[] clientIpTs = new long[distCipRecents.length]; - for (int i = 0; i < clientIpTs.length; i++) { - clientIpTs[i] = currentHour; + try { + String fqdnName = resultSet.getString("FQDN"); + if (isDomain(fqdnName)) { + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + newDoc = new BaseDocument(); + newDoc.setKey(fqdnName); + newDoc.addAttribute("FQDN_NAME", fqdnName); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); } - - String key = vFqdn + "-" + vIp; - newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("FQDN/" + vFqdn); - newDoc.setTo("IP/" + vIp); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("DIST_CIP", distCipRecents); - newDoc.addAttribute("DIST_CIP_TS", clientIpTs); - newDoc.addAttribute("PROTOCOL_TYPE", schemaType); - checkSchemaProperty(newDoc, schemaType, countTotal); + } catch (Exception e) { + e.printStackTrace(); } return newDoc; } - public static BaseEdgeDocument getRelationIpVisitFqdnDocument(ResultSet resultSet) throws SQLException { - BaseEdgeDocument newDoc = null; - String vFqdn = resultSet.getString("FQDN"); - if (isDomain(vFqdn)) { - String vIp = resultSet.getString("common_client_ip"); - String key = vIp + "-" + vFqdn; + public static BaseDocument getVertexIpDocument(ResultSet resultSet) { + BaseDocument newDoc = new BaseDocument(); + try { + String ip = resultSet.getString("IP"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long sessionCount = resultSet.getLong("SESSION_COUNT"); + long bytesSum = resultSet.getLong("BYTES_SUM"); + String ipType = resultSet.getString("ip_type"); + newDoc.setKey(ip); + newDoc.addAttribute("IP", ip); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + switch (ipType) { + case "client": + newDoc.addAttribute("CLIENT_SESSION_COUNT", sessionCount); + newDoc.addAttribute("CLIENT_BYTES_SUM", bytesSum); + newDoc.addAttribute("SERVER_SESSION_COUNT", 0L); + newDoc.addAttribute("SERVER_BYTES_SUM", 0L); + break; + case "server": + newDoc.addAttribute("SERVER_SESSION_COUNT", sessionCount); + newDoc.addAttribute("SERVER_BYTES_SUM", bytesSum); + newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L); + newDoc.addAttribute("CLIENT_BYTES_SUM", 0L); + break; + default: + } + newDoc.addAttribute("COMMON_LINK_INFO", ""); + } catch (Exception e) { + e.printStackTrace(); + } + return newDoc; + } + + public static BaseDocument getVertexSubscriberDocument(ResultSet resultSet) { + BaseDocument newDoc = new BaseDocument(); + try { + String subscriberId = resultSet.getString("common_subscriber_id"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + newDoc.setKey(subscriberId); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + } catch (Exception e) { + e.printStackTrace(); + } + return newDoc; + } + + public static BaseEdgeDocument getRelationshipSubsciberLocateIpDocument(ResultSet resultSet) { + BaseEdgeDocument newDoc = new BaseEdgeDocument(); + try { + String subscriberId = resultSet.getString("common_subscriber_id"); + String framedIp = resultSet.getString("radius_framed_ip"); long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); long countTotal = resultSet.getLong("COUNT_TOTAL"); - String schemaType = resultSet.getString("schema_type"); - newDoc = new BaseEdgeDocument(); + String key = subscriberId + "-" + framedIp; newDoc.setKey(key); - newDoc.setFrom("IP/" + vIp); - newDoc.setTo("FQDN/" + vFqdn); + newDoc.setFrom("SUBSCRIBER/" + subscriberId); + newDoc.setTo("IP/" + framedIp); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("PROTOCOL_TYPE", schemaType); - checkSchemaProperty(newDoc, schemaType, countTotal); + newDoc.addAttribute("COUNT_TOTAL", countTotal); + } catch (Exception e) { + e.printStackTrace(); + } + return newDoc; + + } + + public static BaseEdgeDocument getRelationFqdnAddressIpDocument(ResultSet resultSet) { + BaseEdgeDocument newDoc = null; + try { + String vFqdn = resultSet.getString("FQDN"); + if (isDomain(vFqdn)) { + String vIp = resultSet.getString("common_server_ip"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String schemaType = resultSet.getString("schema_type"); + String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); + long[] clientIpTs = new long[distCipRecents.length]; + for (int i = 0; i < clientIpTs.length; i++) { + clientIpTs[i] = currentHour; + } + + String key = vFqdn + "-" + vIp; + newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("FQDN/" + vFqdn); + newDoc.setTo("IP/" + vIp); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("DIST_CIP", distCipRecents); + newDoc.addAttribute("DIST_CIP_TS", clientIpTs); + newDoc.addAttribute("PROTOCOL_TYPE", schemaType); + checkSchemaProperty(newDoc, schemaType, countTotal); + } + } catch (Exception e) { + e.printStackTrace(); + } + return newDoc; + } + + public static BaseEdgeDocument getRelationIpVisitFqdnDocument(ResultSet resultSet) { + BaseEdgeDocument newDoc = null; + try { + String vFqdn = resultSet.getString("FQDN"); + if (isDomain(vFqdn)) { + String vIp = resultSet.getString("common_client_ip"); + String key = vIp + "-" + vFqdn; + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String schemaType = resultSet.getString("schema_type"); + + newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("IP/" + vIp); + newDoc.setTo("FQDN/" + vFqdn); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("PROTOCOL_TYPE", schemaType); + checkSchemaProperty(newDoc, schemaType, countTotal); + } + } catch (Exception e) { + e.printStackTrace(); } return newDoc; } @@ -166,8 +191,8 @@ public class ReadClickhouseData { public static void putMapByHashcode(T newDoc, HashMap>> map) { if (newDoc != null) { String key = newDoc.getKey(); - int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap> documentHashMap = map.getOrDefault(i, new HashMap<>()); + int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; + HashMap> documentHashMap = map.getOrDefault(hashCode, new HashMap<>()); ArrayList documentArrayList = documentHashMap.getOrDefault(key, new ArrayList<>()); documentArrayList.add(newDoc); documentHashMap.put(key, documentArrayList); @@ -176,6 +201,15 @@ public class ReadClickhouseData { private static boolean isDomain(String fqdn) { try { + if (fqdn == null || fqdn.length() == 0){ + return false; + } + if (fqdn.contains(":")){ + String s = fqdn.split(":")[0]; + if (s.contains(":")){ + return false; + } + } String[] fqdnArr = fqdn.split("\\."); if (fqdnArr.length < 4 || fqdnArr.length > 4) { return true; @@ -183,7 +217,7 @@ public class ReadClickhouseData { for (String f : fqdnArr) { if (pattern.matcher(f).matches()) { - int i = Integer.parseInt(f); + long i = Long.parseLong(f); if (i < 0 || i > 255) { return true; } @@ -198,23 +232,22 @@ public class ReadClickhouseData { } private static void checkSchemaProperty(BaseEdgeDocument newDoc, String schema, long countTotal) { - long[] recentCnt = new long[24]; + long[] recentCnt = new long[RECENT_COUNT_HOUR]; recentCnt[0] = countTotal; - for (String protocol:protocolSet){ - String protocolRecent = protocol +"_CNT_RECENT"; - String protocolTotal = protocol + "_CNT_TOTAL"; - if (protocol.equals(schema)){ + for (String protocol : PROTOCOL_SET) { + String protocolRecent = protocol + "_CNT_RECENT"; + String protocolTotal = protocol + "_CNT_TOTAL"; + if (protocol.equals(schema)) { newDoc.addAttribute(protocolTotal, countTotal); newDoc.addAttribute(protocolRecent, recentCnt); - }else { + } else { newDoc.addAttribute(protocolTotal, 0L); - newDoc.addAttribute(protocolRecent, new long[24]); + newDoc.addAttribute(protocolRecent, new long[RECENT_COUNT_HOUR]); } } } public static String getVertexFqdnSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = "common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; @@ -224,7 +257,6 @@ public class ReadClickhouseData { } public static String getVertexIpSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; @@ -234,17 +266,15 @@ public class ReadClickhouseData { } public static String getRelationshipFqdnAddressIpSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; - String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(10000)(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; - String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(10000)(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; + String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; + String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; } public static String getRelationshipIpVisitFqdnSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; @@ -254,7 +284,6 @@ public class ReadClickhouseData { } public static String getVertexSubscriberSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; @@ -262,7 +291,6 @@ public class ReadClickhouseData { } public static String getRelationshipSubsciberLocateIpSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; @@ -270,10 +298,19 @@ public class ReadClickhouseData { } private static long[] getTimeLimit() { - long maxTime = currentHour; - long minTime = maxTime - 3600; -// long maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; -// long minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; + long maxTime = 0L; + long minTime = 0L; + switch (ApplicationConfig.TIME_LIMIT_TYPE) { + case 0: + maxTime = currentHour; + minTime = maxTime - ApplicationConfig.UPDATE_INTERVAL; + break; + case 1: + maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; + minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; + break; + default: + } return new long[]{maxTime, minTime}; } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java index 03baf7f..9b26fdc 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java @@ -1,5 +1,6 @@ package cn.ac.iie.service.ingestion; +import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.ArangoCursor; import com.arangodb.entity.BaseDocument; @@ -11,6 +12,8 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import static cn.ac.iie.service.ingestion.ReadClickhouseData.RECENT_COUNT_HOUR; + /** * @author wlh * 多线程全量读取arangoDb历史数据,封装到map @@ -20,12 +23,17 @@ public class ReadHistoryArangoData extends Thread { private ArangoDBConnect arangoConnect; private String query; - private ConcurrentHashMap map; + private ConcurrentHashMap> map; private Class type; private String table; private CountDownLatch countDownLatch; - public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, ConcurrentHashMap map, Class type, String table, CountDownLatch countDownLatch) { + public ReadHistoryArangoData(ArangoDBConnect arangoConnect, + String query, + ConcurrentHashMap> map, + Class type, + String table, + CountDownLatch countDownLatch) { this.arangoConnect = arangoConnect; this.query = query; this.map = map; @@ -53,26 +61,29 @@ public class ReadHistoryArangoData extends Thread { break; default: } - map.put(key, doc); + int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; + ConcurrentHashMap tmpMap = map.get(hashCode); + tmpMap.put(key, doc); i++; } long l = System.currentTimeMillis(); - LOG.info(query + "\n读取数据" + i + "条,运行时间:" + (l - s)); + LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s)); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch.countDown(); + LOG.info("本线程读取完毕,剩余线程数量:"+countDownLatch.getCount()); } } private void updateProtocolDocument(T doc) { if (doc.getProperties().containsKey("PROTOCOL_TYPE")) { - for (String protocol : ReadClickhouseData.protocolSet) { + for (String protocol : ReadClickhouseData.PROTOCOL_SET) { String protocolRecent = protocol + "_CNT_RECENT"; ArrayList cntRecent = (ArrayList) doc.getAttribute(protocolRecent); Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); - Long[] cntRecentsDst = new Long[24]; + Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR]; System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); cntRecentsDst[0] = 0L; doc.addAttribute(protocolRecent, cntRecentsDst); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java index 1c9203d..8d69b46 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java @@ -8,7 +8,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -20,29 +19,28 @@ public class Document extends Thread{ private String collectionName; private ConcurrentHashMap historyDocumentMap; private CountDownLatch countDownLatch; - private Class type; Document(HashMap> newDocumentMap, ArangoDBConnect arangoManger, String collectionName, ConcurrentHashMap historyDocumentMap, - CountDownLatch countDownLatch, - Class type) { + CountDownLatch countDownLatch) { this.newDocumentMap = newDocumentMap; this.arangoManger = arangoManger; this.collectionName = collectionName; this.historyDocumentMap = historyDocumentMap; this.countDownLatch = countDownLatch; - this.type = type; } @Override public void run() { - Set keySet = newDocumentMap.keySet(); - ArrayList resultDocumentList = new ArrayList<>(); - int i = 0; + long start = System.currentTimeMillis(); + LOG.info("新读取数据"+newDocumentMap.size()+"条,历史数据"+historyDocumentMap.size()+"条"); try { + Set keySet = newDocumentMap.keySet(); + ArrayList resultDocumentList = new ArrayList<>(); + int i = 0; for (String key : keySet) { ArrayList newDocumentSchemaList = newDocumentMap.getOrDefault(key, null); if (newDocumentSchemaList != null) { @@ -66,6 +64,8 @@ public class Document extends Thread{ LOG.error(e.toString()); }finally { countDownLatch.countDown(); + long last = System.currentTimeMillis(); + LOG.info("本线程更新完毕,用时:"+(last-start)+",剩余线程数量:"+countDownLatch.getCount()); } } @@ -83,35 +83,45 @@ public class Document extends Thread{ historyDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); } - private T mergeDocument(ArrayList newDocumentSchemaList) throws IllegalAccessException, InstantiationException { + private T mergeDocument(ArrayList newDocumentSchemaList){ if (newDocumentSchemaList == null || newDocumentSchemaList.isEmpty()){ return null; }else if (newDocumentSchemaList.size() == 1){ return newDocumentSchemaList.get(0); }else { - T newDocument = type.newInstance(); - Map newProperties = newDocument.getProperties(); - for (T doc:newDocumentSchemaList){ - if (newProperties.isEmpty()){ - newDocument = doc; - newProperties = doc.getProperties(); + T newDocument = null; + for (T lastDoc:newDocumentSchemaList){ + if (newDocument == null){ + newDocument = lastDoc; }else { - mergeFunction(newProperties,doc); + mergeFunction(lastDoc,newDocument); } } - newDocument.setProperties(newProperties); return newDocument; } } - protected void mergeFunction(Map newProperties, T lastDoc) { - long firstFoundTime = Long.parseLong(newProperties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); - long docFirstFoundTime = Long.parseLong(lastDoc.getAttribute("FIRST_FOUND_TIME").toString()); - newProperties.put("FIRST_FOUND_TIME",firstFoundTimedocLastFoundTime? lastFoundTime:docLastFoundTime); + protected void putMinAttribute(T firstDoc,T lastDoc,String attribute){ + long firstMinAttribute = Long.parseLong(firstDoc.getAttribute(attribute).toString()); + long lastMinAttribute = Long.parseLong(lastDoc.getAttribute(attribute).toString()); + lastDoc.addAttribute(attribute,firstMinAttributelastMaxAttribute? firstMaxAttribute:lastMaxAttribute); + } + + protected void putSumAttribute(T firstDoc,T lastDoc,String attribute){ + long firstSumAttribute = Long.parseLong(firstDoc.getAttribute(attribute).toString()); + long lastSumAttribute = Long.parseLong(lastDoc.getAttribute(attribute).toString()); + lastDoc.addAttribute(attribute,firstSumAttribute+lastSumAttribute); } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java index 4047478..7930463 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java @@ -6,7 +6,6 @@ import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -17,7 +16,7 @@ public class Relationship extends Document { String collectionName, ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap,arangoManger,collectionName,historyDocumentMap,countDownLatch,BaseEdgeDocument.class); + super(newDocumentHashMap,arangoManger,collectionName,historyDocumentMap,countDownLatch); } @Override @@ -25,6 +24,11 @@ public class Relationship extends Document { super.updateFunction(newEdgeDocument,historyEdgeDocument); } + @Override + protected void mergeFunction(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) { + super.mergeFunction(lastDoc, newDocument); + } + protected void updateProcotol(BaseEdgeDocument historyEdgeDocument, String schema, BaseEdgeDocument newEdgeDocument){ String recentSchema = schema +"_CNT_RECENT"; String totalSchema = schema + "_CNT_TOTAL"; @@ -45,27 +49,21 @@ public class Relationship extends Document { } } - @Override - protected void mergeFunction(Map newProperties, BaseEdgeDocument lastDoc) { - super.mergeFunction(newProperties, lastDoc); - } - - protected void mergeProtocol(Map newProperties, BaseEdgeDocument lastDoc) { + protected void mergeProtocol(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) { String schema = lastDoc.getAttribute("PROTOCOL_TYPE").toString(); - if (ReadClickhouseData.protocolSet.contains(schema)){ - setProtocolProperties(schema,newProperties,lastDoc); + if (ReadClickhouseData.PROTOCOL_SET.contains(schema)){ + setProtocolProperties(schema,newDocument,lastDoc); } } - private void setProtocolProperties(String protocol,Map newProperties, BaseEdgeDocument lastDoc){ + private void setProtocolProperties(String protocol,BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument){ String protocolRecent = protocol +"_CNT_RECENT"; String protocolTotal = protocol + "_CNT_TOTAL"; - long httpCntTotal = Long.parseLong(lastDoc.getAttribute(protocolTotal).toString()); - newProperties.put(protocolTotal, httpCntTotal); - long[] httpCntRecents = (long[]) lastDoc.getAttribute(protocolRecent); - newProperties.put(protocolRecent, httpCntRecents); - String protocolType = newProperties.get("PROTOCOL_TYPE").toString(); - newProperties.put("PROTOCOL_TYPE",addProcotolType(protocolType,protocol)); + putSumAttribute(lastDoc,newDocument,protocolTotal); + long[] cntRecents = (long[]) lastDoc.getAttribute(protocolRecent); + newDocument.addAttribute(protocolRecent, cntRecents); + String protocolType = newDocument.getAttribute("PROTOCOL_TYPE").toString(); + newDocument.addAttribute("PROTOCOL_TYPE",addProcotolType(protocolType,protocol)); } private String addProcotolType(String protocolType,String schema){ diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java index 322a995..10a4d29 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java @@ -5,7 +5,6 @@ import com.arangodb.entity.BaseDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -20,22 +19,7 @@ public class Vertex extends Document { String collectionName, ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch,BaseDocument.class); - } - - @Override - protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) { - super.updateFunction(newDocument, historyDocument); - } - - @Override - protected void mergeFunction(Map properties, BaseDocument doc) { - super.mergeFunction(properties, doc); - } - - @Override - public void run() { - super.run(); + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java index 21904cc..c8dca13 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java @@ -9,6 +9,9 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import static cn.ac.iie.service.ingestion.ReadClickhouseData.DISTINCT_CLIENT_IP_NUM; +import static cn.ac.iie.service.ingestion.ReadClickhouseData.currentHour; + public class LocateFqdn2Ip extends Relationship { public LocateFqdn2Ip(HashMap> newDocumentHashMap, @@ -20,15 +23,30 @@ public class LocateFqdn2Ip extends Relationship { } @Override - protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc){ - super.mergeFunction(properties,schemaEdgeDoc); - mergeProtocol(properties, schemaEdgeDoc); + protected void mergeFunction(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument){ + super.mergeFunction(lastDoc, newDocument); + mergeDistinctClientIp(lastDoc, newDocument); + mergeProtocol(lastDoc, newDocument); + } + + private void mergeDistinctClientIp(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument){ + HashSet clientIpSet = new HashSet<>(); + String[] distCips = (String[]) newDocument.getAttribute("DIST_CIP"); + String[] lastDistCips = (String[]) lastDoc.getAttribute("DIST_CIP"); + clientIpSet.addAll(Arrays.asList(distCips)); + clientIpSet.addAll(Arrays.asList(lastDistCips)); + long[] clientIpTs = new long[clientIpSet.size()]; + for (int i = 0; i < clientIpTs.length; i++) { + clientIpTs[i] = currentHour; + } + newDocument.addAttribute("DIST_CIP", clientIpSet.toArray()); + newDocument.addAttribute("DIST_CIP_TS", clientIpTs); } @Override protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { super.updateFunction(newEdgeDocument, historyEdgeDocument); - for (String schema:ReadClickhouseData.protocolSet){ + for (String schema:ReadClickhouseData.PROTOCOL_SET){ updateProcotol(historyEdgeDocument,schema,newEdgeDocument); } updateDistinctClientIp(newEdgeDocument, historyEdgeDocument); @@ -45,7 +63,7 @@ public class LocateFqdn2Ip extends Relationship { } Object[] distCipRecent = (Object[])newEdgeDocument.getAttribute("DIST_CIP"); for (Object cip:distCipRecent){ - distCipToTs.put(cip.toString(), ReadClickhouseData.currentHour); + distCipToTs.put(cip.toString(), currentHour); } Map sortDistCip = sortMapByValue(distCipToTs); @@ -65,8 +83,8 @@ public class LocateFqdn2Ip extends Relationship { List> entryList = new ArrayList<>(oriMap.entrySet()); entryList.sort((o1, o2) -> o2.getValue().compareTo(o1.getValue())); - if(entryList.size() > 10000){ - for(Map.Entry set:entryList.subList(0, 10000)){ + if(entryList.size() > DISTINCT_CLIENT_IP_NUM){ + for(Map.Entry set:entryList.subList(0, DISTINCT_CLIENT_IP_NUM)){ sortedMap.put(set.getKey(), set.getValue()); } }else { diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java index 431927f..e240529 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java @@ -23,14 +23,14 @@ public class VisitIp2Fqdn extends Relationship { @Override protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { super.updateFunction(newEdgeDocument, historyEdgeDocument); - for (String schema: ReadClickhouseData.protocolSet){ + for (String schema: ReadClickhouseData.PROTOCOL_SET){ updateProcotol(historyEdgeDocument,schema,newEdgeDocument); } } @Override - protected void mergeFunction(Map newProperties, BaseEdgeDocument lastDoc) { - super.mergeFunction(newProperties, lastDoc); - mergeProtocol(newProperties, lastDoc); + protected void mergeFunction(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) { + super.mergeFunction(lastDoc, newDocument); + mergeProtocol(lastDoc,newDocument); } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java index 4cdedbd..61bb0b9 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java @@ -6,7 +6,6 @@ import com.arangodb.entity.BaseDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -27,53 +26,23 @@ public class Ip extends Vertex { } @Override - protected void mergeFunction(Map properties, BaseDocument doc) { - super.mergeFunction(properties, doc); - mergeIpByType(properties,doc); + protected void mergeFunction(BaseDocument lastDoc, BaseDocument newDocument) { + super.mergeFunction(lastDoc, newDocument); + mergeIpByType(lastDoc, newDocument); } - private void mergeIpByType(Map properties, BaseDocument doc){ - Map mergeProperties = doc.getProperties(); - checkIpTypeProperty(properties,mergeProperties,"CLIENT_SESSION_COUNT"); - checkIpTypeProperty(properties,mergeProperties,"CLIENT_BYTES_SUM"); - checkIpTypeProperty(properties,mergeProperties,"SERVER_SESSION_COUNT"); - checkIpTypeProperty(properties,mergeProperties,"SERVER_BYTES_SUM"); + private void mergeIpByType(BaseDocument lastDoc, BaseDocument newDocument) { + putSumAttribute(lastDoc,newDocument,"CLIENT_SESSION_COUNT"); + putSumAttribute(lastDoc,newDocument,"CLIENT_BYTES_SUM"); + putSumAttribute(lastDoc,newDocument,"SERVER_SESSION_COUNT"); + putSumAttribute(lastDoc,newDocument,"SERVER_BYTES_SUM"); } - private void checkIpTypeProperty(Map properties,Map mergeProperties,String property){ - try { - if (!properties.containsKey(property)){ - properties.put(property,0L); - checkIpTypeProperty(properties,mergeProperties,property); - }else if ("0".equals(properties.get(property).toString()) && mergeProperties.containsKey(property)){ - if (!"0".equals(mergeProperties.get(property).toString())){ - properties.put(property,Long.parseLong(mergeProperties.get(property).toString())); - } - } - }catch (Exception e){ - e.printStackTrace(); - } - } - - private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument){ - addProperty(newDocument,historyDocument,"CLIENT_SESSION_COUNT"); - addProperty(newDocument,historyDocument,"CLIENT_BYTES_SUM"); - addProperty(newDocument,historyDocument,"SERVER_SESSION_COUNT"); - addProperty(newDocument,historyDocument,"SERVER_BYTES_SUM"); - } - - private void addProperty(BaseDocument newDocument, BaseDocument historyDocument,String property){ - try { - if (historyDocument.getProperties().containsKey(property)){ - long newProperty = Long.parseLong(newDocument.getAttribute(property).toString()); - long hisProperty = Long.parseLong(historyDocument.getAttribute(property).toString()); - historyDocument.updateAttribute(property,newProperty+hisProperty); - }else { - historyDocument.addAttribute(property,0L); - } - }catch (Exception e){ - e.printStackTrace(); - } + private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument) { + putSumAttribute(newDocument, historyDocument, "CLIENT_SESSION_COUNT"); + putSumAttribute(newDocument, historyDocument, "CLIENT_BYTES_SUM"); + putSumAttribute(newDocument, historyDocument, "SERVER_SESSION_COUNT"); + putSumAttribute(newDocument, historyDocument, "SERVER_BYTES_SUM"); } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index e81e7a8..fd4d91e 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -71,27 +71,6 @@ public class ArangoDBConnect { } } - @Deprecated - public void insertAndUpdate(ArrayList docInsert,ArrayList docUpdate,String collectionName){ - ArangoDatabase database = getDatabase(); - try { - ArangoCollection collection = database.collection(collectionName); - if (!docInsert.isEmpty()){ - DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions(); -// documentCreateOptions.overwrite(true); - collection.insertDocuments(docInsert, documentCreateOptions); - } - if (!docUpdate.isEmpty()){ - collection.replaceDocuments(docUpdate); - } - }catch (Exception e){ - LOG.error("更新失败\n"+e.toString()); - }finally { - docInsert.clear(); - docInsert.clear(); - } - } - public void overwrite(ArrayList docOverwrite,String collectionName){ ArangoDatabase database = getDatabase(); try { diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java index e7b83ec..b47e796 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java @@ -20,7 +20,7 @@ public class ExecutorThreadPool { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("iplearning-application-pool-%d").build(); pool = new ThreadPoolExecutor(5, ApplicationConfig.THREAD_POOL_NUMBER, - 0L, TimeUnit.MILLISECONDS, + 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); } diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index e7f5186..e639891 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -1,10 +1,10 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.127 +arangoDB.host=192.168.40.182 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111 #arangoDB.DB.name=ip-learning-test -arangoDB.DB.name=ip-learning-test +arangoDB.DB.name=ip-learning-test-0 arangoDB.batch=100000 arangoDB.ttl=3600 @@ -13,5 +13,13 @@ update.arango.batch=10000 thread.pool.number=10 thread.await.termination.time=10 -read.clickhouse.max.time=1594981808 -read.clickhouse.min.time=1593878400 \ No newline at end of file + +#读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围 +time.limit.type=1 +read.clickhouse.max.time=1595489408 +read.clickhouse.min.time=1593878400 + +update.interval=3600 +distinct.client.ip.num=10000 +recent.count.hour=24 + diff --git a/IP-learning-graph/src/main/resources/log4j.properties b/IP-learning-graph/src/main/resources/log4j.properties index 21cea3d..2366506 100644 --- a/IP-learning-graph/src/main/resources/log4j.properties +++ b/IP-learning-graph/src/main/resources/log4j.properties @@ -17,6 +17,7 @@ log4j.appender.file.encoding=UTF-8 log4j.appender.file.Append=true #··زӦĿ #log4j.appender.file.file=/home/ceiec/iplearning/logs/ip-learning-application.log +#log4j.appender.file.file=/home/ceiec/iplearning/testLog/ip-learning-application.log log4j.appender.file.file=./logs/ip-learning-application.log log4j.appender.file.DatePattern='.'yyyy-MM-dd log4j.appender.file.layout=org.apache.log4j.PatternLayout