From a301d6b402b7d0ed1cf9b163283edbda8ead2e60 Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Wed, 15 Jul 2020 19:33:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=BD=E8=B1=A1document=E7=88=B6=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ip-learning-java-test/pom.xml | 15 +- .../java/cn/ac/iie/dao/BaseArangoData.java | 58 ++- .../cn/ac/iie/dao/BaseClickhouseData.java | 338 ++++++------------ .../java/cn/ac/iie/dao/UpdateGraphData.java | 157 ++++++++ .../iie/etl/relationship/LocateFqdn2Ip.java | 31 -- .../java/cn/ac/iie/etl/update/Vertex.java | 116 ------ .../main/java/cn/ac/iie/etl/vertex/Ip.java | 20 -- .../UpdateEFqdnAddressIp.java | 2 +- .../{etl => service}/UpdateEIpVisitFqdn.java | 2 +- .../ac/iie/{etl => service}/UpdateVFqdn.java | 2 +- .../cn/ac/iie/{etl => service}/UpdateVIP.java | 2 +- .../iie/service/read/ReadClickhouseData.java | 251 +++++++++++++ .../read/ReadHistoryArangoData.java | 28 +- .../service/relationship/LocateFqdn2Ip.java | 76 ++++ .../relationship/LocateSubscriber2Ip.java | 32 ++ .../relationship/VisitIp2Fqdn.java | 13 +- .../cn/ac/iie/service/update/Document.java | 118 ++++++ .../{etl => service}/update/Relationship.java | 123 +++---- .../java/cn/ac/iie/service/update/Vertex.java | 40 +++ .../ac/iie/{etl => service}/vertex/Fqdn.java | 11 +- .../java/cn/ac/iie/service/vertex/Ip.java | 79 ++++ .../cn/ac/iie/service/vertex/Subscriber.java | 21 ++ .../iie/test/IpLearningApplicationTest.java | 71 +--- .../ac/iie/test/ReadArangoDBThreadTest.java | 95 ----- .../java/cn/ac/iie/utils/ArangoDBConnect.java | 2 +- .../src/main/resources/application.properties | 6 +- .../src/main/resources/clickhouse.properties | 3 +- .../src/main/resources/log4j.properties | 24 ++ .../src/test/java/cn/ac/iie/TestArango.java | 2 +- 29 files changed, 1074 insertions(+), 664 deletions(-) create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java delete mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java delete mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Vertex.java delete mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Ip.java rename ip-learning-java-test/src/main/java/cn/ac/iie/{etl => service}/UpdateEFqdnAddressIp.java (98%) rename ip-learning-java-test/src/main/java/cn/ac/iie/{etl => service}/UpdateEIpVisitFqdn.java (98%) rename ip-learning-java-test/src/main/java/cn/ac/iie/{etl => service}/UpdateVFqdn.java (99%) rename ip-learning-java-test/src/main/java/cn/ac/iie/{etl => service}/UpdateVIP.java (98%) create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java rename ip-learning-java-test/src/main/java/cn/ac/iie/{etl => service}/read/ReadHistoryArangoData.java (50%) create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java rename ip-learning-java-test/src/main/java/cn/ac/iie/{etl => service}/relationship/VisitIp2Fqdn.java (55%) create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java rename ip-learning-java-test/src/main/java/cn/ac/iie/{etl => service}/update/Relationship.java (53%) create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java rename ip-learning-java-test/src/main/java/cn/ac/iie/{etl => service}/vertex/Fqdn.java (61%) create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Ip.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Subscriber.java delete mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java create mode 100644 ip-learning-java-test/src/main/resources/log4j.properties diff --git a/ip-learning-java-test/pom.xml b/ip-learning-java-test/pom.xml index 73c4361..f124e52 100644 --- a/ip-learning-java-test/pom.xml +++ b/ip-learning-java-test/pom.xml @@ -10,6 +10,19 @@ + + + org.slf4j + slf4j-api + 1.7.21 + + + + org.slf4j + slf4j-log4j12 + 1.7.21 + + ru.yandex.clickhouse clickhouse-jdbc @@ -31,7 +44,7 @@ com.arangodb arangodb-java-driver - 4.2.2 + 6.6.3 diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java index c00523c..d90ee44 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -1,39 +1,74 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.etl.read.ReadHistoryArangoData; +import cn.ac.iie.service.read.ReadHistoryArangoData; import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ClickhouseConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.ArangoCursor; import com.arangodb.entity.BaseDocument; import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Enumeration; import java.util.concurrent.ConcurrentHashMap; +/** + * 获取arangoDB历史数据 + */ public class BaseArangoData { - public static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); - public static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); + private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); + + public static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap v_Subscriber_Map = new ConcurrentHashMap<>(); public static ConcurrentHashMap e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>(); public static ConcurrentHashMap e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap e_Subsciber_Locate_Ip_Map = new ConcurrentHashMap<>(); private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); private static ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); public void baseDocumentDataMap(){ - readHistoryData("FQDN", v_Fqdn_Map); - readHistoryData("IP", v_Ip_Map); - readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map); - readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map); + long startA = System.currentTimeMillis(); + readHistoryData("FQDN", v_Fqdn_Map,BaseDocument.class); + readHistoryData("IP", v_Ip_Map,BaseDocument.class); + readHistoryData("SUBSCRIBER",v_Subscriber_Map,BaseDocument.class); +// readHistoryData("R_LOCATE_FQDN2IP", e_Fqdn_Address_Ip_Map); +// readHistoryData("R_VISIT_IP2FQDN", e_Ip_Visit_Fqdn_Map); +// readHistoryData("R_LOCATE_SUBSCRIBER2IP",e_Subsciber_Locate_Ip_Map); threadPool.shutdown(); threadPool.awaitThreadTask(); + LOG.info("v_Fqdn_Map大小:"+v_Fqdn_Map.size()); + LOG.info("v_Ip_Map大小:"+v_Ip_Map.size()); + LOG.info("v_Subscriber_Map大小:"+v_Subscriber_Map.size()); + LOG.info("e_Fqdn_Address_Ip_Map大小:"+e_Fqdn_Address_Ip_Map.size()); + LOG.info("e_Ip_Visit_Fqdn_Map大小:"+e_Ip_Visit_Fqdn_Map.size()); + LOG.info("e_Subsciber_Locate_Ip_Map大小:"+e_Subsciber_Locate_Ip_Map.size()); + long lastA = System.currentTimeMillis(); + LOG.info("读取ArangoDb时间:"+(lastA - startA)); } - private void readHistoryData(String table, ConcurrentHashMap map){ + public static void main(String[] args) { + new BaseArangoData().readHistoryData("IP", v_Ip_Map,BaseDocument.class); + threadPool.shutdown(); + threadPool.awaitThreadTask(); + ArrayList baseEdgeDocuments = new ArrayList<>(); + Enumeration keys = v_Ip_Map.keys(); + arangoDBConnect.overwrite(baseEdgeDocuments,"IP"); + arangoDBConnect.clean(); + + } + + private void readHistoryData(String table, ConcurrentHashMap map, Class type){ try { long[] timeRange = getTimeRange(table); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { String sql = getQuerySql(timeRange, i, table); - ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData(arangoDBConnect, sql, map); + ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type); threadPool.executor(readHistoryArangoData); } }catch (Exception e){ @@ -55,9 +90,9 @@ public class BaseArangoData { minTime = Long.parseLong(doc.getAttribute("min_time").toString()); } long lastTime = System.currentTimeMillis(); - System.out.println("查询最大最小时间用时:" + (lastTime - startTime)); + LOG.info(sql+"\n查询最大最小时间用时:" + (lastTime - startTime)); }else { - System.out.println("获取最大最小时间异常"); + LOG.warn("获取ArangoDb时间范围为空"); } }catch (Exception e){ e.printStackTrace(); @@ -75,5 +110,4 @@ public class BaseArangoData { return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; } - } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index 9d084a1..b33b73c 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -1,287 +1,185 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.etl.UpdateEFqdnAddressIp; -import cn.ac.iie.etl.UpdateEIpVisitFqdn; -import cn.ac.iie.etl.UpdateVFqdn; -import cn.ac.iie.etl.UpdateVIP; import cn.ac.iie.utils.ClickhouseConnect; -import cn.ac.iie.utils.TopDomainUtils; import com.alibaba.druid.pool.DruidPooledConnection; import com.arangodb.entity.BaseDocument; import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; +import static cn.ac.iie.service.read.ReadClickhouseData.*; + +/** + * 读取clickhouse数据,封装到map + * @author wlh + */ public class BaseClickhouseData { - private static final ClickhouseConnect manger = ClickhouseConnect.getInstance(); - private static HashMap>> vFqdnMap = new HashMap<>(); - private static HashMap> vIpMap = new HashMap<>(); - private static HashMap> eFqdnAddressIpMap = new HashMap<>(); - private static HashMap> eIpVisitFqdnMap = new HashMap<>(); - public Connection connection; - public Statement pstm; + private static final Logger LOG = LoggerFactory.getLogger(BaseClickhouseData.class); - public BaseClickhouseData(){} + private static ClickhouseConnect manger = ClickhouseConnect.getInstance(); + static HashMap>> vFqdnMap = new HashMap<>(); + static HashMap>> vIpMap = new HashMap<>(); + static HashMap>> vSubscriberMap = new HashMap<>(); + static HashMap>> eFqdnAddressIpMap = new HashMap<>(); + static HashMap>> eIpVisitFqdnMap = new HashMap<>(); + static HashMap>> eSubsciberLocateIpMap = new HashMap<>(); - private static long[] getTimeLimit(){ - long maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; - long minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; - return new long[]{maxTime,minTime}; - } + private DruidPooledConnection connection; + private Statement statement; - static { - for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - vFqdnMap.put(i,new HashMap<>()); - } - System.out.println("V_FQDN resultMap初始化完成"); - for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - vIpMap.put(i,new HashMap<>()); - } - System.out.println("V_IP resultMap初始化完成"); - for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - eFqdnAddressIpMap.put(i,new HashMap<>()); - } - System.out.println("E_ADDRESS_V_FQDN_TO_V_IP resultMap初始化完成"); - for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - eIpVisitFqdnMap.put(i,new HashMap<>()); - } - System.out.println("E_VISIT_V_IP_TO_V_FQDN resultMap初始化完成"); - } - - public static void BaseVFqdn(){ - BaseVDomainFromReferer(); - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" and media_domain != '' "; - String sql = "SELECT media_domain AS FQDN_NAME,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT( * ) AS FQDN_COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY media_domain"; - System.out.println(sql); + void baseVertexFqdn() { + initializeMap(vFqdnMap); + LOG.info("FQDN resultMap初始化完成"); + String sql = getVertexFqdnSql(); long start = System.currentTimeMillis(); try { - DruidPooledConnection connection = manger.getConnection(); - Statement statement = connection.createStatement(); + connection = manger.getConnection(); + statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()){ - String fqdnName = resultSet.getString("FQDN_NAME"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long fqdnCountTotal = resultSet.getLong("FQDN_COUNT_TOTAL"); - BaseDocument newDoc = new BaseDocument(); - newDoc.setKey(fqdnName); - newDoc.addAttribute("FQDN_NAME",fqdnName); - newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); - newDoc.addAttribute("FQDN_COUNT_TOTAL",fqdnCountTotal); - int i = fqdnName.hashCode() % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap> documentHashMap = vFqdnMap.getOrDefault(i, new HashMap<>()); - ArrayList documentArrayList = documentHashMap.getOrDefault(fqdnName, new ArrayList<>()); - documentArrayList.add(newDoc); - documentHashMap.put(fqdnName,documentArrayList); + while (resultSet.next()) { + BaseDocument newDoc = getVertexFqdnDocument(resultSet); + if (newDoc != null) { + putMapByHashcode(newDoc,vFqdnMap); + } } long last = System.currentTimeMillis(); - System.out.println("读取clickhouse v_FQDN时间:"+(last - start)); - for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - HashMap> baseDocumentHashMap = vFqdnMap.get(i); - UpdateVFqdn updateVFqdn = new UpdateVFqdn(baseDocumentHashMap); - updateVFqdn.run(); - } - }catch (Exception e){ - e.printStackTrace(); + LOG.info(sql + "\n读取clickhouse v_FQDN时间:" + (last - start)); + } catch (Exception e) { + LOG.error(e.toString()); + }finally { + manger.clear(statement,connection); } } - private static void BaseVDomainFromReferer(){ - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" and s1_referer != '' "; - String sql = "SELECT s1_referer AS FQDN_NAME,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT( * ) AS FQDN_COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_referer"; - System.out.println(sql); + void baseVertexIp() { + initializeMap(vIpMap); + LOG.info("IP resultMap初始化完成"); + String sql = getVertexIpSql(); long start = System.currentTimeMillis(); try { - DruidPooledConnection connection = manger.getConnection(); - Statement statement = connection.createStatement(); + connection = manger.getConnection(); + statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()){ - String referer = resultSet.getString("FQDN_NAME"); - String fqdnName = TopDomainUtils.getDomainFromUrl(referer); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long fqdnCountTotal = resultSet.getLong("FQDN_COUNT_TOTAL"); - BaseDocument newDoc = new BaseDocument(); - newDoc.setKey(fqdnName); - newDoc.addAttribute("FQDN_NAME",fqdnName); - newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); - newDoc.addAttribute("FQDN_COUNT_TOTAL",fqdnCountTotal); - int i = fqdnName.hashCode() % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap> documentHashMap = vFqdnMap.getOrDefault(i, new HashMap<>()); - ArrayList documentArrayList = documentHashMap.getOrDefault(fqdnName, new ArrayList<>()); - documentArrayList.add(newDoc); - documentHashMap.put(fqdnName,documentArrayList); + while (resultSet.next()) { + BaseDocument newDoc = getVertexIpDocument(resultSet); + putMapByHashcode(newDoc,vIpMap); } long last = System.currentTimeMillis(); - System.out.println("读取clickhouse v_FQDN时间:"+(last - start)); - }catch (Exception e){ - e.printStackTrace(); + LOG.info(sql + "\n读取clickhouse v_IP时间:" + (last - start)); + } catch (Exception e) { + LOG.error(e.toString()); + }finally { + manger.clear(statement,connection); } } - public static void BaseVIp(){ - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = " recv_time >= "+minTime+" and recv_time <= "+maxTime; - String sql = "SELECT IP,location,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT(*) AS IP_COUNT_TOTAL FROM(( SELECT s1_s_ip AS IP, s1_s_location_region AS location, recv_time FROM media_expire_patch where "+where+" ) UNION ALL ( SELECT s1_d_ip AS IP, s1_d_location_region AS location, recv_time FROM media_expire_patch where "+where+" )) GROUP BY IP,location"; - System.out.println(sql); + void baseVertexSubscriber(){ + initializeMap(vSubscriberMap); + LOG.info("SUBSCRIBER resultMap初始化完成"); + String sql = getVertexSubscriberSql(); long start = System.currentTimeMillis(); try { - DruidPooledConnection connection = manger.getConnection(); - Statement statement = connection.createStatement(); + connection = manger.getConnection(); + statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()){ - String ip = resultSet.getString("IP"); - String location = resultSet.getString("location"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long ipCountTotal = resultSet.getLong("IP_COUNT_TOTAL"); - BaseDocument newDoc = new BaseDocument(); - newDoc.setKey(ip); - newDoc.addAttribute("IP",ip); - newDoc.addAttribute("IP_LOCATION",location); - newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); - newDoc.addAttribute("IP_COUNT_TOTAL",ipCountTotal); - int i = ip.hashCode() % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap documentHashMap = vIpMap.getOrDefault(i, new HashMap()); - documentHashMap.put(ip,newDoc); + BaseDocument newDoc = getVertexSubscriberDocument(resultSet); + putMapByHashcode(newDoc,vSubscriberMap); } long last = System.currentTimeMillis(); - System.out.println("读取clickhouse v_IP时间:"+(last - start)); - for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - HashMap baseDocumentHashMap = vIpMap.get(i); - UpdateVIP updateVIp = new UpdateVIP(baseDocumentHashMap); - updateVIp.run(); - } + LOG.info(sql + "\n读取clickhouse v_SUBSCRIBER时间:" + (last - start)); }catch (Exception e){ + LOG.error(sql + "\n读取clickhouse v_SUBSCRIBER失败"); e.printStackTrace(); + }finally { + manger.clear(statement,connection); } } - public static void BaseEFqdnAddressIp(){ - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND media_domain != '' AND s1_d_ip != '' "; - String sql = "SELECT media_domain AS V_FQDN,s1_d_ip AS V_IP,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_d_ip,media_domain"; - System.out.println(sql); + void baseRelationshipSubscriberLocateIp(){ + initializeMap(eSubsciberLocateIpMap); + LOG.info("R_LOCATE_SUBSCRIBER2IP"); + String sql = getRelationshipSubsciberLocateIpSql(); long start = System.currentTimeMillis(); try { - DruidPooledConnection connection = manger.getConnection(); - Statement statement = connection.createStatement(); + connection = manger.getConnection(); + statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()){ - String vFqdn = resultSet.getString("V_FQDN"); - String vIp = resultSet.getString("V_IP"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - String key = vFqdn+"-"+vIp; - BaseEdgeDocument newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("V_FQDN/"+vFqdn); - newDoc.setTo("V_IP/"+vIp); - newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL",countTotal); - int i = key.hashCode() % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap documentHashMap = eFqdnAddressIpMap.getOrDefault(i, new HashMap()); - documentHashMap.put(key,newDoc); + BaseEdgeDocument newDoc = getRelationshipSubsciberLocateIpDocument(resultSet); + putMapByHashcode(newDoc,eSubsciberLocateIpMap); } long last = System.currentTimeMillis(); - System.out.println("读取clickhouse EFqdnAddressIp时间:"+(last - start)); - for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - HashMap baseDocumentHashMap = eFqdnAddressIpMap.get(i); - UpdateEFqdnAddressIp updateEFqdnAddressIp = new UpdateEFqdnAddressIp(baseDocumentHashMap); - updateEFqdnAddressIp.run(); - } + LOG.info(sql + "\n读取clickhouse ESubsciberLocateIp时间:" + (last - start)); }catch (Exception e){ + LOG.error(sql + "\n读取clickhouse ESubsciberLocateIp失败"); e.printStackTrace(); + }finally { + manger.clear(statement,connection); } } - public static void BaseEdgeFqdnSameFqdn(){ - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND media_domain != '' AND s1_d_ip != '' "; - String sql = "SELECT s1_domain AS V_FQDN,s1_referer,MIN(recv_time) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_referer,s1_domain"; - System.out.println(sql); - try { - DruidPooledConnection connection = manger.getConnection(); - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()){ - String vFqdn = resultSet.getString("V_FQDN"); - String referer = resultSet.getString("s1_referer"); - String refererDomain = TopDomainUtils.getDomainFromUrl(referer); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - String key = vFqdn+"-"+refererDomain; - - } - }catch (Exception e){ - e.printStackTrace(); - } - - } - - public static void BaseEIpVisitFqdn(){ - long[] timeLimit = getTimeLimit(); - long maxTime = timeLimit[0]; - long minTime = timeLimit[1]; - String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" AND s1_s_ip != '' AND media_domain != '' "; - String sql = "SELECT s1_s_ip AS V_IP,media_domain AS V_FQDN,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_s_ip,media_domain"; - System.out.println(sql); + void baseRelationshipFqdnAddressIp() { + initializeMap(eFqdnAddressIpMap); + LOG.info("R_LOCATE_FQDN2IP resultMap初始化完成"); + String sql = getRelationshipFqdnAddressIpSql(); long start = System.currentTimeMillis(); try { - DruidPooledConnection connection = manger.getConnection(); - Statement statement = connection.createStatement(); + connection = manger.getConnection(); + statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()){ - String vIp = resultSet.getString("V_IP"); - String vFqdn = resultSet.getString("V_FQDN"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - String key = vIp +"-"+ vFqdn; - BaseEdgeDocument newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("V_IP/"+vIp); - newDoc.setTo("V_FQDN/"+vFqdn); - newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL",countTotal); - int i = key.hashCode() % ApplicationConfig.THREAD_POOL_NUMBER; - HashMap documentHashMap = eIpVisitFqdnMap.getOrDefault(i, new HashMap()); - documentHashMap.put(key,newDoc); + + while (resultSet.next()) { + BaseEdgeDocument newDoc = getRelationFqdnAddressIpDocument(resultSet); + putMapByHashcode(newDoc,eFqdnAddressIpMap); } long last = System.currentTimeMillis(); - System.out.println("读取clickhouse EIpVisitFqdn时间:"+(last - start)); - for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ - HashMap baseDocumentHashMap = eIpVisitFqdnMap.get(i); - UpdateEIpVisitFqdn updateEIpVisitFqdn = new UpdateEIpVisitFqdn(baseDocumentHashMap); - updateEIpVisitFqdn.run(); + LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start)); + } catch (Exception e) { + LOG.error(e.toString()); + }finally { + manger.clear(statement,connection); + } + } + + void baseRelationshipIpVisitFqdn() { + initializeMap(eIpVisitFqdnMap); + LOG.info("R_VISIT_IP2FQDN resultMap初始化完成"); + String sql = getRelationshipIpVisitFqdnSql(); + long start = System.currentTimeMillis(); + try { + connection = manger.getConnection(); + statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()) { + BaseEdgeDocument newDoc = getRelationIpVisitFqdnDocument(resultSet); + putMapByHashcode(newDoc,eIpVisitFqdnMap); + } + long last = System.currentTimeMillis(); + LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); + } catch (Exception e) { + LOG.error(e.toString()); + }finally { + manger.clear(statement,connection); + } + } + + private void initializeMap(HashMap>> map){ + try { + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + map.put(i, new HashMap<>()); } }catch (Exception e){ e.printStackTrace(); + LOG.error("初始化数据失败"); } } + } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java new file mode 100644 index 0000000..825543b --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -0,0 +1,157 @@ +package cn.ac.iie.dao; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.service.relationship.LocateFqdn2Ip; +import cn.ac.iie.service.relationship.LocateSubscriber2Ip; +import cn.ac.iie.service.relationship.VisitIp2Fqdn; +import cn.ac.iie.service.vertex.Fqdn; +import cn.ac.iie.service.vertex.Ip; +import cn.ac.iie.service.vertex.Subscriber; +import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ExecutorThreadPool; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; + +/** + * 更新图数据库业务类 + */ +public class UpdateGraphData { + private static final Logger LOG = LoggerFactory.getLogger(UpdateGraphData.class); + private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance(); + private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + private CountDownLatch countDownLatch; + + public void updateArango(){ + long startC = System.currentTimeMillis(); + try { + BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); + baseClickhouseData.baseVertexFqdn(); + updateVertexFqdn(); + + baseClickhouseData.baseVertexIp(); + updateVertexIp(); + +// baseClickhouseData.baseRelationshipFqdnAddressIp(); +// updateRelationFqdnAddressIp(); + +// baseClickhouseData.baseRelationshipIpVisitFqdn(); +// updateRelationIpVisitFqdn(); + + baseClickhouseData.baseVertexSubscriber(); + updateVertexSubscriber(); + +// baseClickhouseData.baseRelationshipSubscriberLocateIp(); +// updateRelationshipSubsciberLocateIp(); + }catch (Exception e){ + e.printStackTrace(); + }finally { + arangoManger.clean(); + } + long lastC = System.currentTimeMillis(); + LOG.info("更新ArangoDb时间:"+(lastC - startC)); + } + + private void updateVertexFqdn(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> stringArrayListHashMap = BaseClickhouseData.vFqdnMap.get(i); + LOG.info("vFqdn baseDocumentHashMap大小:" + stringArrayListHashMap.size()); + Fqdn updateFqdn = new Fqdn(stringArrayListHashMap, arangoManger, "FQDN", BaseArangoData.v_Fqdn_Map,countDownLatch); + updateFqdn.run(); + } + countDownLatch.await(); + LOG.info("---------FQDN vertex 更新完毕---------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + + private void updateVertexSubscriber(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> stringArrayListHashMap = BaseClickhouseData.vSubscriberMap.get(i); + LOG.info("vSubscriber baseDocumentHashMap大小:" + stringArrayListHashMap.size()); + Subscriber updateSubscriber = new Subscriber(stringArrayListHashMap, arangoManger, "SUBSCRIBER", BaseArangoData.v_Subscriber_Map,countDownLatch); + updateSubscriber.run(); + } + countDownLatch.await(); + LOG.info("---------SUBSCRIBER vertex 更新完毕---------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + + private void updateRelationshipSubsciberLocateIp(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> baseDocumentHashMap = BaseClickhouseData.eSubsciberLocateIpMap.get(i); + LOG.info("ESubsciberLocateIp baseDocumentHashMap大小:" + baseDocumentHashMap.size()); + LocateSubscriber2Ip locateSubscriber2Ip = new LocateSubscriber2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_SUBSCRIBER2IP", BaseArangoData.e_Subsciber_Locate_Ip_Map, countDownLatch); + locateSubscriber2Ip.run(); + } + countDownLatch.await(); + LOG.info("------------R_LOCATE_SUBSCRIBER2IP relationship 更新完毕----------------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + + private void updateVertexIp(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> stringArrayListHashMap = BaseClickhouseData.vIpMap.get(i); + LOG.info("vIp baseDocumentHashMap大小:" + stringArrayListHashMap.size()); + Ip updateIp = new Ip(stringArrayListHashMap, arangoManger, "IP", BaseArangoData.v_Ip_Map, countDownLatch); + updateIp.run(); + } + countDownLatch.await(); + LOG.info("----------IP vertex 更新完毕-------------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + + private void updateRelationFqdnAddressIp(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> baseDocumentHashMap = BaseClickhouseData.eFqdnAddressIpMap.get(i); + LOG.info("EFqdnAddressIp baseDocumentHashMap大小:" + baseDocumentHashMap.size()); + LocateFqdn2Ip fqdnAddressIp = new LocateFqdn2Ip(baseDocumentHashMap, arangoManger, "R_LOCATE_FQDN2IP", BaseArangoData.e_Fqdn_Address_Ip_Map, countDownLatch); + fqdnAddressIp.run(); + } + countDownLatch.await(); + LOG.info("------------R_LOCATE_FQDN2IP relationship 更新完毕----------------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + + private void updateRelationIpVisitFqdn(){ + try { + countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + HashMap> baseDocumentHashMap = BaseClickhouseData.eIpVisitFqdnMap.get(i); + LOG.info("EIpVisitFqdn baseDocumentHashMap大小:" + baseDocumentHashMap.size()); + VisitIp2Fqdn ipVisitFqdn = new VisitIp2Fqdn(baseDocumentHashMap,arangoManger,"R_VISIT_IP2FQDN",BaseArangoData.e_Ip_Visit_Fqdn_Map,countDownLatch); + ipVisitFqdn.run(); + } + countDownLatch.await(); + LOG.info("---------------R_VISIT_IP2FQDN ralationship 更新完毕----------------"); + }catch (Exception e){ + e.printStackTrace(); + } + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java deleted file mode 100644 index 3988096..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/LocateFqdn2Ip.java +++ /dev/null @@ -1,31 +0,0 @@ -package cn.ac.iie.etl.relationship; - -import cn.ac.iie.etl.update.Relationship; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseEdgeDocument; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class LocateFqdn2Ip extends Relationship { - - public LocateFqdn2Ip(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, - String collectionName, - ConcurrentHashMap historyDocumentMap) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap); - } - - @Override - protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc){ - super.mergeFunction(properties,schemaEdgeDoc); - super.mergeDistinctClientIp(properties,schemaEdgeDoc); - } - - @Override - protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { - super.updateFunction(newEdgeDocument, historyEdgeDocument); - super.updateDistinctClientIp(newEdgeDocument, historyEdgeDocument); - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Vertex.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Vertex.java deleted file mode 100644 index 88da292..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Vertex.java +++ /dev/null @@ -1,116 +0,0 @@ -package cn.ac.iie.etl.update; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * @author wlh - * 多线程更新vertex数据 - */ -public class Vertex extends Thread{ - - protected HashMap> newDocumentHashMap; - protected ArangoDBConnect arangoManger; - protected String collectionName; - protected ConcurrentHashMap historyDocumentMap; - - public Vertex(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, - String collectionName, - ConcurrentHashMap historyDocumentMap){ - this.newDocumentHashMap = newDocumentHashMap; - this.arangoManger = arangoManger; - this.collectionName = collectionName; - this.historyDocumentMap = historyDocumentMap; - } - - @Override - public void run() { - Set keySet = newDocumentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - int i = 0; - try { - for (String key:keySet){ - ArrayList documentArrayList = newDocumentHashMap.getOrDefault(key, null); - BaseDocument newDocument = mergeVertex(documentArrayList); - if (newDocument != null){ - i += 1; - BaseDocument historyDocument = historyDocumentMap.getOrDefault(key, null); - updateVertex(newDocument,historyDocument,docInsert); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ - arangoManger.overwrite(docInsert,collectionName); - System.out.println("更新"+i); - i = 0; - } - } - if (i != 0){ - arangoManger.overwrite(docInsert,collectionName); - System.out.println("更新"+i); - } - }catch (Exception e){ - e.printStackTrace(); - } - } - - private void updateVertex(BaseDocument newDocument,BaseDocument historyDocument,ArrayList docInsert){ - if (historyDocument != null){ - updateFunction(newDocument,historyDocument); - docInsert.add(historyDocument); - }else { - docInsert.add(newDocument); - } - } - - protected void updateFunction(BaseDocument newDocument,BaseDocument historyDocument){ - updateFoundTime(newDocument,historyDocument); - } - - private void updateFoundTime(BaseDocument newDocument,BaseDocument historyDocument){ - Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); - historyDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); - } - - private BaseDocument mergeVertex(ArrayList documentArrayList){ - if (documentArrayList == null || documentArrayList.isEmpty()){ - return null; - }else if (documentArrayList.size() == 1){ - return documentArrayList.get(0); - }else { - BaseDocument document = new BaseDocument(); - Map properties = document.getProperties(); - for (BaseDocument doc:documentArrayList){ - if (properties.isEmpty()){ - document = doc; - properties = doc.getProperties(); - }else { - mergeFunction(properties,doc); - } - } - document.setProperties(properties); - return document; - } - } - - protected void mergeFunction(Map properties,BaseDocument doc){ - mergeFoundTime(properties,doc); - } - - private void mergeFoundTime(Map properties,BaseDocument doc){ - long firstFoundTime = Long.parseLong(properties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); - long docFirstFoundTime = Long.parseLong(doc.getAttribute("FIRST_FOUND_TIME").toString()); - properties.put("FIRST_FOUND_TIME",firstFoundTimedocLastFoundTime? lastFoundTime:docLastFoundTime); - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Ip.java deleted file mode 100644 index e04cd96..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Ip.java +++ /dev/null @@ -1,20 +0,0 @@ -package cn.ac.iie.etl.vertex; - -import cn.ac.iie.etl.update.Vertex; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; - -public class Ip extends Vertex { - - public Ip(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, - String collectionName, - ConcurrentHashMap historyDocumentMap) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap); - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java similarity index 98% rename from ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java index 0abbc05..7b11692 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java @@ -1,4 +1,4 @@ -package cn.ac.iie.etl; +package cn.ac.iie.service; import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.dao.BaseArangoData; diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java similarity index 98% rename from ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java index a07dadf..7a0ddf2 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java @@ -1,4 +1,4 @@ -package cn.ac.iie.etl; +package cn.ac.iie.service; import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.dao.BaseArangoData; diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java similarity index 99% rename from ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java index 037dc40..c49aec4 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java @@ -1,4 +1,4 @@ -package cn.ac.iie.etl; +package cn.ac.iie.service; import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.dao.BaseArangoData; diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVIP.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java similarity index 98% rename from ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVIP.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java index 12a906a..08fff78 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVIP.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java @@ -1,4 +1,4 @@ -package cn.ac.iie.etl; +package cn.ac.iie.service; import cn.ac.iie.config.ApplicationConfig; diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java new file mode 100644 index 0000000..7c14b63 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java @@ -0,0 +1,251 @@ +package cn.ac.iie.service.read; + +import cn.ac.iie.config.ApplicationConfig; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.regex.Pattern; + +/** + * @author wlh + */ +public class ReadClickhouseData { + + public static long currentHour = System.currentTimeMillis() / (60 * 60 * 1000) * 60 * 60; + + private static Pattern pattern = Pattern.compile("^[\\d]*$"); + private static final Logger LOG = LoggerFactory.getLogger(ReadClickhouseData.class); + + public static BaseDocument getVertexFqdnDocument(ResultSet resultSet) throws SQLException { + String fqdnName = resultSet.getString("FQDN"); + BaseDocument newDoc = null; + if (isDomain(fqdnName)) { + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + newDoc = new BaseDocument(); + newDoc.setKey(fqdnName); + newDoc.addAttribute("FQDN_NAME", fqdnName); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + } + return newDoc; + } + + public static BaseDocument getVertexIpDocument(ResultSet resultSet) throws SQLException { + BaseDocument newDoc = new BaseDocument(); + String ip = resultSet.getString("IP"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long sessionCount = resultSet.getLong("SESSION_COUNT"); + long bytesSum = resultSet.getLong("BYTES_SUM"); + String ipType = resultSet.getString("ip_type"); + newDoc.setKey(ip); + newDoc.addAttribute("IP", ip); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + switch (ipType) { + case "client": + newDoc.addAttribute("CLIENT_SESSION_COUNT", sessionCount); + newDoc.addAttribute("CLIENT_BYTES_SUM", bytesSum); + newDoc.addAttribute("SERVER_SESSION_COUNT", 0L); + newDoc.addAttribute("SERVER_BYTES_SUM", 0L); + break; + case "server": + newDoc.addAttribute("SERVER_SESSION_COUNT", sessionCount); + newDoc.addAttribute("SERVER_BYTES_SUM", bytesSum); + newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L); + newDoc.addAttribute("CLIENT_BYTES_SUM", 0L); + break; + default: + } + newDoc.addAttribute("COMMON_LINK_INFO", ""); + return newDoc; + } + + public static BaseDocument getVertexSubscriberDocument(ResultSet resultSet) throws SQLException { + String subscriberId = resultSet.getString("common_subscriber_id"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + BaseDocument newDoc = new BaseDocument(); + newDoc.setKey(subscriberId); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + return newDoc; + } + + public static BaseEdgeDocument getRelationshipSubsciberLocateIpDocument(ResultSet resultSet) throws SQLException { + String subscriberId = resultSet.getString("common_subscriber_id"); + String framedIp = resultSet.getString("radius_framed_ip"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + + String key = subscriberId + "-" + framedIp; + BaseEdgeDocument newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("SUBSCRIBER/" + subscriberId); + newDoc.setTo("IP/" + framedIp); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL", countTotal); + + return newDoc; + + } + + public static BaseEdgeDocument getRelationFqdnAddressIpDocument(ResultSet resultSet) throws SQLException { + String vFqdn = resultSet.getString("FQDN"); + BaseEdgeDocument newDoc = null; + if (isDomain(vFqdn)) { + String vIp = resultSet.getString("common_server_ip"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); + long[] clientIpTs = new long[distCipRecents.length]; + for (int i = 0; i < clientIpTs.length; i++) { + clientIpTs[i] = currentHour; + } + + String key = vFqdn + "-" + vIp; + newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("FQDN/" + vFqdn); + newDoc.setTo("IP/" + vIp); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL", countTotal); + newDoc.addAttribute("DIST_CIP", distCipRecents); + newDoc.addAttribute("DIST_CIP_TS", clientIpTs); + + } + return newDoc; + } + + public static BaseEdgeDocument getRelationIpVisitFqdnDocument(ResultSet resultSet) throws SQLException { + BaseEdgeDocument newDoc = null; + String vFqdn = resultSet.getString("FQDN"); + if (isDomain(vFqdn)) { + String vIp = resultSet.getString("common_client_ip"); + String key = vIp + "-" + vFqdn; + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + + newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("IP/" + vIp); + newDoc.setTo("FQDN/" + vFqdn); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL", countTotal); + } + return newDoc; + } + + public static void putMapByHashcode(T newDoc, HashMap>> map){ + if (newDoc != null) { + String key = newDoc.getKey(); + int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; + HashMap> documentHashMap = map.getOrDefault(i, new HashMap<>()); + ArrayList documentArrayList = documentHashMap.getOrDefault(key, new ArrayList<>()); + documentArrayList.add(newDoc); + documentHashMap.put(key,documentArrayList); + } + } + + private static boolean isDomain(String fqdn) { + try { + String[] fqdnArr = fqdn.split("\\."); + if (fqdnArr.length < 4 || fqdnArr.length > 4) { + return true; + } + + for (String f : fqdnArr) { + if (pattern.matcher(f).matches()) { + int i = Integer.parseInt(f); + if (i < 0 || i > 255) { + return true; + } + } else { + return true; + } + } + } catch (Exception e) { + LOG.error("解析域名 " + fqdn + " 失败:\n" + e.toString()); + } + return false; + } + + public static String getVertexFqdnSql() { + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = "common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; + String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni"; + String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host"; + return "SELECT FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME FROM ((" + sslSql + ") UNION ALL (" + httpSql + ")) GROUP BY FQDN HAVING FQDN != ''"; + } + + public static String getVertexIpSql() { + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; + String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; + String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP"; + String frameIpSql = ""; + return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))"; + } + + public static String getRelationshipFqdnAddressIpSql() { + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; + String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'SSL' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip"; + String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip"; + return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; + } + + public static String getRelationshipIpVisitFqdnSql() { + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime; + String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip"; + String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'SSL' AS common_schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip"; + return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''"; + } + + public static String getVertexSubscriberSql() { + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; + return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id"; + } + + public static String getRelationshipSubsciberLocateIpSql() { + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; + return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id,radius_framed_ip"; + } + + private static long[] getTimeLimit() { +// long maxTime = currentHour; +// long minTime = maxTime - 3600; + long maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; + long minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; + return new long[]{maxTime, minTime}; + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java similarity index 50% rename from ip-learning-java-test/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index 971b29b..138bae3 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/read/ReadHistoryArangoData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -1,9 +1,14 @@ -package cn.ac.iie.etl.read; +package cn.ac.iie.service.read; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -11,34 +16,35 @@ import java.util.concurrent.ConcurrentHashMap; * @author wlh * 多线程全量读取arangoDb历史数据,封装到map */ -public class ReadHistoryArangoData extends Thread { +public class ReadHistoryArangoData extends Thread { + private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class); + private ArangoDBConnect arangoDBConnect; private String query; - private ConcurrentHashMap map; + private ConcurrentHashMap map; + private Class type; - public ReadHistoryArangoData(ArangoDBConnect arangoDBConnect, String query, ConcurrentHashMap map) { + public ReadHistoryArangoData(ArangoDBConnect arangoDBConnect, String query, ConcurrentHashMap map,Class type) { this.arangoDBConnect = arangoDBConnect; this.query = query; this.map = map; + this.type = type; } @Override public void run() { - String name = Thread.currentThread().getName(); - System.out.println(name + ":" + query); long s = System.currentTimeMillis(); - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); + ArangoCursor docs = arangoDBConnect.executorQuery(query, type); if (docs != null){ - List baseDocuments = docs.asListRemaining(); + List baseDocuments = docs.asListRemaining(); int i = 0; - for (BaseEdgeDocument doc : baseDocuments) { + for (T doc : baseDocuments) { String key = doc.getKey(); map.put(key, doc); i++; } - System.out.println(name + ":共处理数据" + i); long l = System.currentTimeMillis(); - System.out.println(name + "运行时间:" + (l - s)); + LOG.info(query+ "\n处理数据" + i + "条,运行时间:" + (l - s)); } } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java new file mode 100644 index 0000000..f9e3b88 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateFqdn2Ip.java @@ -0,0 +1,76 @@ +package cn.ac.iie.service.relationship; + +import cn.ac.iie.service.read.ReadClickhouseData; +import cn.ac.iie.service.update.Relationship; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class LocateFqdn2Ip extends Relationship { + + public LocateFqdn2Ip(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); + } + + @Override + protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc){ + super.mergeFunction(properties,schemaEdgeDoc); + } + + @Override + protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { + super.updateFunction(newEdgeDocument, historyEdgeDocument); + updateDistinctClientIp(newEdgeDocument, historyEdgeDocument); + } + + private void updateDistinctClientIp(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ + ArrayList distCip = (ArrayList) edgeDocument.getAttribute("DIST_CIP"); + ArrayList distCipTs = (ArrayList) edgeDocument.getAttribute("DIST_CIP_TS"); + HashMap distCipToTs = new HashMap<>(); + if (distCip.size() == distCipTs.size()){ + for (int i = 0;i < distCip.size();i++){ + distCipToTs.put(distCip.get(i),distCipTs.get(i)); + } + } + Object[] distCipRecent = (Object[])newEdgeDocument.getAttribute("DIST_CIP"); + for (Object cip:distCipRecent){ + distCipToTs.put(cip.toString(), ReadClickhouseData.currentHour); + } + + Map sortDistCip = sortMapByValue(distCipToTs); + edgeDocument.addAttribute("DIST_CIP",sortDistCip.keySet().toArray()); + edgeDocument.addAttribute("DIST_CIP_TS",sortDistCip.values().toArray()); + } + + + /** + * 使用 Map按value进行排序 + */ + private Map sortMapByValue(Map oriMap) { + if (oriMap == null || oriMap.isEmpty()) { + return null; + } + Map sortedMap = new LinkedHashMap<>(); + List> entryList = new ArrayList<>(oriMap.entrySet()); + entryList.sort((o1, o2) -> o2.getValue().compareTo(o1.getValue())); + + if(entryList.size() > 100){ + for(Map.Entry set:entryList.subList(0, 100)){ + sortedMap.put(set.getKey(), set.getValue()); + } + }else { + for(Map.Entry set:entryList){ + sortedMap.put(set.getKey(), set.getValue()); + } + } + return sortedMap; + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java new file mode 100644 index 0000000..c9b63db --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/LocateSubscriber2Ip.java @@ -0,0 +1,32 @@ +package cn.ac.iie.service.relationship; + +import cn.ac.iie.service.update.Relationship; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class LocateSubscriber2Ip extends Relationship { + + public LocateSubscriber2Ip(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); + } + + @Override + protected BaseEdgeDocument mergeRelationship(ArrayList newEdgeDocumentSchemaMap) { + return super.mergeRelationship(newEdgeDocumentSchemaMap); + } + + @Override + protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { + super.updateFoundTime(newEdgeDocument,historyEdgeDocument); + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java similarity index 55% rename from ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java index efafb16..f5b5e3d 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/relationship/VisitIp2Fqdn.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/relationship/VisitIp2Fqdn.java @@ -1,17 +1,20 @@ -package cn.ac.iie.etl.relationship; +package cn.ac.iie.service.relationship; -import cn.ac.iie.etl.update.Relationship; +import cn.ac.iie.service.update.Relationship; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; +import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; public class VisitIp2Fqdn extends Relationship { - public VisitIp2Fqdn(HashMap> newDocumentHashMap, + public VisitIp2Fqdn(HashMap> newDocumentHashMap, ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap); + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java new file mode 100644 index 0000000..ceaf7fa --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java @@ -0,0 +1,118 @@ +package cn.ac.iie.service.update; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class Document extends Thread{ + private static final Logger LOG = LoggerFactory.getLogger(Document.class); + private HashMap> newDocumentMap; + private ArangoDBConnect arangoManger; + private String collectionName; + private ConcurrentHashMap historyDocumentMap; + private CountDownLatch countDownLatch; + private Class type; + + Document(HashMap> newDocumentMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch, + Class type) { + this.newDocumentMap = newDocumentMap; + this.arangoManger = arangoManger; + this.collectionName = collectionName; + this.historyDocumentMap = historyDocumentMap; + this.countDownLatch = countDownLatch; + this.type = type; + } + + + @Override + public void run() { + Set keySet = newDocumentMap.keySet(); + ArrayList resultDocumentList = new ArrayList<>(); + int i = 0; + try { + for (String key : keySet) { + ArrayList newDocumentSchemaList = newDocumentMap.getOrDefault(key, null); + if (newDocumentSchemaList != null) { + T newDocument = mergeDocument(newDocumentSchemaList); + i += 1; + T historyDocument = historyDocumentMap.getOrDefault(key, null); + updateDocument(newDocument,historyDocument,resultDocumentList); + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { + arangoManger.overwrite(resultDocumentList, collectionName); + LOG.info("更新"+collectionName+":" + i); + i = 0; + } + } + } + if (i != 0) { + arangoManger.overwrite(resultDocumentList, collectionName); + LOG.info("更新"+collectionName+":" + i); + } + } catch (Exception e) { + e.printStackTrace(); + LOG.error(e.toString()); + }finally { + countDownLatch.countDown(); + } + } + + private void updateDocument(T newDocument, T historyDocument, ArrayList resultDocumentList) { + if (historyDocument != null){ + updateFunction(newDocument,historyDocument); + resultDocumentList.add(historyDocument); + }else { + resultDocumentList.add(newDocument); + } + } + + protected void updateFunction(T newDocument, T historyDocument) { + Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); + historyDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); + } + + private T mergeDocument(ArrayList newDocumentSchemaList) throws IllegalAccessException, InstantiationException { + if (newDocumentSchemaList == null || newDocumentSchemaList.isEmpty()){ + return null; + }else if (newDocumentSchemaList.size() == 1){ + return newDocumentSchemaList.get(0); + }else { + T document = type.newInstance(); + Map properties = document.getProperties(); + for (T doc:newDocumentSchemaList){ + if (properties.isEmpty()){ + document = doc; + properties = doc.getProperties(); + }else { + mergeFunction(properties,doc); + } + } + document.setProperties(properties); + return document; + } + } + + protected void mergeFunction(Map properties, T doc) { + long firstFoundTime = Long.parseLong(properties.getOrDefault("FIRST_FOUND_TIME", 0L).toString()); + long docFirstFoundTime = Long.parseLong(doc.getAttribute("FIRST_FOUND_TIME").toString()); + properties.put("FIRST_FOUND_TIME",firstFoundTimedocLastFoundTime? lastFoundTime:docLastFoundTime); + } + + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Relationship.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java similarity index 53% rename from ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Relationship.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java index 78141ae..d1172dd 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/update/Relationship.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java @@ -1,27 +1,37 @@ -package cn.ac.iie.etl.update; +package cn.ac.iie.service.update; import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; public class Relationship extends Thread { + private static final Logger LOG = LoggerFactory.getLogger(Relationship.class); - protected HashMap> newDocumentHashMap; - protected ArangoDBConnect arangoManger; - protected String collectionName; - protected ConcurrentHashMap historyDocumentMap; + private HashMap> newDocumentHashMap; + private ArangoDBConnect arangoManger; + private String collectionName; + private ConcurrentHashMap historyDocumentMap; + private CountDownLatch countDownLatch; - public Relationship(HashMap> newDocumentHashMap, + public Relationship(HashMap> newDocumentHashMap, ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap) { + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { this.newDocumentHashMap = newDocumentHashMap; this.arangoManger = arangoManger; this.collectionName = collectionName; this.historyDocumentMap = historyDocumentMap; + this.countDownLatch = countDownLatch; } @Override @@ -31,53 +41,37 @@ public class Relationship extends Thread { int i = 0; try { for (String key : keySet) { - HashMap newEdgeDocumentSchemaMap = newDocumentHashMap.getOrDefault(key, null); - if (newEdgeDocumentSchemaMap != null) { - BaseEdgeDocument newEdgeDocument = mergeRelationship(newEdgeDocumentSchemaMap); + ArrayList newEdgeDocumentSchemaList = newDocumentHashMap.getOrDefault(key, null); + if (newEdgeDocumentSchemaList != null) { + BaseEdgeDocument newEdgeDocument = mergeRelationship(newEdgeDocumentSchemaList); i += 1; BaseEdgeDocument historyEdgeDocument = historyDocumentMap.getOrDefault(key, null); updateRelationship(newEdgeDocument,historyEdgeDocument,docInsert); if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { arangoManger.overwrite(docInsert, collectionName); - System.out.println("更新"+collectionName+":" + i); + LOG.info("更新"+collectionName+":" + i); i = 0; } } } if (i != 0) { arangoManger.overwrite(docInsert, collectionName); - System.out.println("更新"+collectionName+":" + i); + LOG.info("更新"+collectionName+":" + i); } } catch (Exception e) { e.printStackTrace(); - System.out.println(e.toString()); + LOG.error(e.toString()); + }finally { + countDownLatch.countDown(); } } - private BaseEdgeDocument mergeRelationship(HashMap newEdgeDocumentSchemaMap) { - BaseEdgeDocument newBaseEdgeDocument = new BaseEdgeDocument(); - Set schemaSets = newEdgeDocumentSchemaMap.keySet(); - Map properties = newBaseEdgeDocument.getProperties(); - - for (String schema : schemaSets) { - BaseEdgeDocument schemaEdgeDoc = newEdgeDocumentSchemaMap.get(schema); - if (!properties.isEmpty()) { - mergeFunction(properties, schemaEdgeDoc); - } else { - newBaseEdgeDocument = schemaEdgeDoc; - properties = schemaEdgeDoc.getProperties(); - } - setSchemaCount(schema, schemaEdgeDoc, properties); - } - properties.remove("COUNT_TOTAL"); - checkSchemaProperty(properties); - - newBaseEdgeDocument.setProperties(properties); - return newBaseEdgeDocument; + protected BaseEdgeDocument mergeRelationship(ArrayList newEdgeDocumentSchemaMap) { + return new BaseEdgeDocument(); } private void updateRelationship(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument,ArrayList docInsert){ - if (historyEdgeDocument != null) { + if (historyEdgeDocument != null && newEdgeDocument != null) { updateFunction(newEdgeDocument, historyEdgeDocument); docInsert.add(historyEdgeDocument); } else { @@ -89,7 +83,7 @@ public class Relationship extends Thread { updateFoundTime(newEdgeDocument,historyEdgeDocument); setSchemaCntByHistory(historyEdgeDocument,"TLS_CNT_RECENT","TLS_CNT_TOTAL",newEdgeDocument); setSchemaCntByHistory(historyEdgeDocument,"HTTP_CNT_RECENT","HTTP_CNT_TOTAL",newEdgeDocument); -// updateDistinctClientIp(newEdgeDocument,historyEdgeDocument); + setSchemaCntByHistory(historyEdgeDocument,"DNS_CNT_RECENT","DNS_CNT_TOTAL",newEdgeDocument); } protected void updateFoundTime(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument historyEdgeDocument){ @@ -97,13 +91,13 @@ public class Relationship extends Thread { historyEdgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); } - protected void setSchemaCntByHistory(BaseEdgeDocument historyEdgeDocument,String schema,String totalSchema,BaseEdgeDocument newEdgeDocument){ + private void setSchemaCntByHistory(BaseEdgeDocument historyEdgeDocument,String schema,String totalSchema,BaseEdgeDocument newEdgeDocument){ long countTotal = Long.parseLong(newEdgeDocument.getAttribute(totalSchema).toString()); long updateCountTotal = Long.parseLong(historyEdgeDocument.getAttribute(totalSchema).toString()); ArrayList cntRecent = (ArrayList) historyEdgeDocument.getAttribute(schema); Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); - Long[] cntRecentsDst = new Long[7]; + Long[] cntRecentsDst = new Long[24]; System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); cntRecentsDst[0] = countTotal; @@ -113,41 +107,9 @@ public class Relationship extends Thread { protected void mergeFunction(Map properties, BaseEdgeDocument schemaEdgeDoc) { mergeFoundTime(properties, schemaEdgeDoc); -// mergeDistinctClientIp(properties,schemaEdgeDoc); } - protected void mergeDistinctClientIp(Map properties, BaseEdgeDocument schemaEdgeDoc){ - String[] schemaDistCipRecents = (String[]) schemaEdgeDoc.getAttribute("DIST_CIP_RECENT"); - String[] distCipRecents = (String[]) properties.get("DIST_CIP_RECENT"); - Object[] mergeClientIp = distinctIp(schemaDistCipRecents, distCipRecents); - properties.put("DIST_CIP_RECENT", mergeClientIp); - properties.put("DIST_CIP_TOTAL",mergeClientIp); - } - - protected void updateDistinctClientIp(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ - ArrayList distCipTotal = (ArrayList) edgeDocument.getAttribute("DIST_CIP_TOTAL"); - String[] distCipTotalsSrc = distCipTotal.toArray(new String[distCipTotal.size()]); - - Object[] distCipRecentsSrc = (Object[])newEdgeDocument.getAttribute("DIST_CIP_RECENT"); - if (distCipTotalsSrc.length == 30) { - Object[] distCipTotals = distinctIp(distCipTotalsSrc, distCipRecentsSrc); - edgeDocument.addAttribute("DIST_CIP_TOTAL", distCipTotals); - } - edgeDocument.addAttribute("DIST_CIP_RECENT", distCipRecentsSrc); - } - - protected Object[] distinctIp(Object[] distCipTotalsSrc,Object[] distCipRecentsSrc){ - HashSet dIpSet = new HashSet<>(); - dIpSet.addAll(Arrays.asList(distCipRecentsSrc)); - dIpSet.addAll(Arrays.asList(distCipTotalsSrc)); - Object[] distCipTotals = dIpSet.toArray(); - if (distCipTotals.length > 30) { - System.arraycopy(distCipTotals, 0, distCipTotals, 0, 30); - } - return distCipTotals; - } - - protected void mergeFoundTime(Map properties, BaseEdgeDocument schemaEdgeDoc) { + private void mergeFoundTime(Map properties, BaseEdgeDocument schemaEdgeDoc) { long schemaFirstFoundTime = Long.parseLong(schemaEdgeDoc.getAttribute("FIRST_FOUND_TIME").toString()); long firstFoundTime = Long.parseLong(properties.get("FIRST_FOUND_TIME").toString()); properties.put("FIRST_FOUND_TIME", schemaFirstFoundTime < firstFoundTime ? schemaFirstFoundTime : firstFoundTime); @@ -156,19 +118,19 @@ public class Relationship extends Thread { properties.put("LAST_FOUND_TIME", schemaLastFoundTime > lastFoundTime ? schemaLastFoundTime : lastFoundTime); } - protected void setSchemaCount(String schema, BaseEdgeDocument schemaEdgeDoc, Map properties) { + private void setSchemaCount(String schema, BaseEdgeDocument schemaEdgeDoc, Map properties) { switch (schema) { case "HTTP": long httpCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); properties.put("HTTP_CNT_TOTAL", httpCntTotal); - long[] httpCntRecentsDst = new long[7]; + long[] httpCntRecentsDst = new long[24]; httpCntRecentsDst[0] = httpCntTotal; properties.put("HTTP_CNT_RECENT", httpCntRecentsDst); break; case "SSL": long tlsCntTotal = Long.parseLong(schemaEdgeDoc.getAttribute("COUNT_TOTAL").toString()); properties.put("TLS_CNT_TOTAL", tlsCntTotal); - long[] tlsCntRecentsDst = new long[7]; + long[] tlsCntRecentsDst = new long[24]; tlsCntRecentsDst[0] = tlsCntTotal; properties.put("TLS_CNT_RECENT", tlsCntRecentsDst); break; @@ -177,13 +139,18 @@ public class Relationship extends Thread { } } - protected void checkSchemaProperty(Map properties){ + private void checkSchemaProperty(Map properties){ if (!properties.containsKey("TLS_CNT_TOTAL")){ properties.put("TLS_CNT_TOTAL",0L); - properties.put("TLS_CNT_RECENT",new long[7]); - }else if (!properties.containsKey("HTTP_CNT_TOTAL")){ + properties.put("TLS_CNT_RECENT",new long[24]); + } + if (!properties.containsKey("HTTP_CNT_TOTAL")){ properties.put("HTTP_CNT_TOTAL",0L); - properties.put("HTTP_CNT_RECENT",new long[7]); + properties.put("HTTP_CNT_RECENT",new long[24]); + } + if (!properties.containsKey("DNS_CNT_TOTAL")){ + properties.put("DNS_CNT_TOTAL",0L); + properties.put("DNS_CNT_RECENT",new long[24]); } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java new file mode 100644 index 0000000..eebbb74 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java @@ -0,0 +1,40 @@ +package cn.ac.iie.service.update; + +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +/** + * @author wlh + * 多线程更新vertex数据 + */ +public class Vertex extends Document { + + public Vertex(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch,BaseDocument.class); + } + + @Override + protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) { + super.updateFunction(newDocument, historyDocument); + } + + @Override + protected void mergeFunction(Map properties, BaseDocument doc) { + super.mergeFunction(properties, doc); + } + + @Override + public void run() { + super.run(); + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Fqdn.java similarity index 61% rename from ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java rename to ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Fqdn.java index 7e2172d..976925a 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/vertex/Fqdn.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Fqdn.java @@ -1,20 +1,21 @@ -package cn.ac.iie.etl.vertex; +package cn.ac.iie.service.vertex; -import cn.ac.iie.etl.update.Vertex; +import cn.ac.iie.service.update.Vertex; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; public class Fqdn extends Vertex { public Fqdn(HashMap> newDocumentHashMap, ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap); + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Ip.java new file mode 100644 index 0000000..001b993 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Ip.java @@ -0,0 +1,79 @@ +package cn.ac.iie.service.vertex; + +import cn.ac.iie.service.update.Vertex; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class Ip extends Vertex { + + public Ip(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); + } + + @Override + protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) { + super.updateFunction(newDocument, historyDocument); + updateIpByType(newDocument, historyDocument); + } + + @Override + protected void mergeFunction(Map properties, BaseDocument doc) { + super.mergeFunction(properties, doc); + mergeIpByType(properties,doc); + } + + private void mergeIpByType(Map properties, BaseDocument doc){ + Map mergeProperties = doc.getProperties(); + checkIpTypeProperty(properties,mergeProperties,"CLIENT_SESSION_COUNT"); + checkIpTypeProperty(properties,mergeProperties,"CLIENT_BYTES_SUM"); + checkIpTypeProperty(properties,mergeProperties,"SERVER_SESSION_COUNT"); + checkIpTypeProperty(properties,mergeProperties,"SERVER_BYTES_SUM"); + } + + private void checkIpTypeProperty(Map properties,Map mergeProperties,String property){ + try { + if (!properties.containsKey(property)){ + properties.put(property,0L); + checkIpTypeProperty(properties,mergeProperties,property); + }else if ("0".equals(properties.get(property).toString()) && mergeProperties.containsKey(property)){ + if (!"0".equals(mergeProperties.get(property).toString())){ + properties.put(property,Long.parseLong(mergeProperties.get(property).toString())); + } + } + }catch (Exception e){ + e.printStackTrace(); + } + } + + private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument){ + addProperty(newDocument,historyDocument,"CLIENT_SESSION_COUNT"); + addProperty(newDocument,historyDocument,"CLIENT_BYTES_SUM"); + addProperty(newDocument,historyDocument,"SERVER_SESSION_COUNT"); + addProperty(newDocument,historyDocument,"SERVER_BYTES_SUM"); + } + + private void addProperty(BaseDocument newDocument, BaseDocument historyDocument,String property){ + try { + if (historyDocument.getProperties().containsKey(property)){ + long newProperty = Long.parseLong(newDocument.getAttribute(property).toString()); + long hisProperty = Long.parseLong(historyDocument.getAttribute(property).toString()); + historyDocument.updateAttribute(property,newProperty+hisProperty); + }else { + historyDocument.addAttribute(property,0L); + } + }catch (Exception e){ + e.printStackTrace(); + } + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Subscriber.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Subscriber.java new file mode 100644 index 0000000..8689980 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/vertex/Subscriber.java @@ -0,0 +1,21 @@ +package cn.ac.iie.service.vertex; + +import cn.ac.iie.service.update.Vertex; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class Subscriber extends Vertex { + + public Subscriber(HashMap> newDocumentHashMap, + ArangoDBConnect arangoManger, + String collectionName, + ConcurrentHashMap historyDocumentMap, + CountDownLatch countDownLatch) { + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java index 6427961..e29202d 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java @@ -1,74 +1,25 @@ package cn.ac.iie.test; import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.dao.BaseClickhouseData; -import cn.ac.iie.utils.ArangoDBConnect; -import cn.ac.iie.utils.ExecutorThreadPool; +import cn.ac.iie.dao.UpdateGraphData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.concurrent.CountDownLatch; public class IpLearningApplicationTest { + private static final Logger LOG = LoggerFactory.getLogger(IpLearningApplicationTest.class); public static void main(String[] args) { - long startA = System.currentTimeMillis(); + long start = System.currentTimeMillis(); + LOG.info("Ip Learning Application开始运行"); BaseArangoData baseArangoData = new BaseArangoData(); baseArangoData.baseDocumentDataMap(); + LOG.info("历史数据读取完成,开始更新数据"); + UpdateGraphData updateGraphData = new UpdateGraphData(); + updateGraphData.updateArango(); + long last = System.currentTimeMillis(); + LOG.info("共计运行时间:"+(last - start)); - long lastA = System.currentTimeMillis(); - System.out.println("读取ArangoDb时间:"+(lastA - startA)); - - - /* - - long startC = System.currentTimeMillis(); - CountDownLatch countDownLatch = new CountDownLatch(4); - new Thread(new Runnable() { - @Override - public void run() { - BaseClickhouseData.BaseVFqdn(); - countDownLatch.countDown(); - } - }).start(); - - new Thread(new Runnable() { - @Override - public void run() { - BaseClickhouseData.BaseVIp(); - countDownLatch.countDown(); - } - }).start(); - - new Thread(new Runnable() { - @Override - public void run() { - BaseClickhouseData.BaseEFqdnAddressIp(); - countDownLatch.countDown(); - } - }).start(); - - new Thread(new Runnable() { - @Override - public void run() { - BaseClickhouseData.BaseEIpVisitFqdn(); - countDownLatch.countDown(); - } - }).start(); - try { - countDownLatch.await(); - }catch (Exception e){ - e.printStackTrace(); - } - long lastC = System.currentTimeMillis(); - System.out.println("更新ArangoDb时间:"+(lastC - startC)); - - */ - - System.out.println(BaseArangoData.v_Fqdn_Map.size()); - System.out.println(BaseArangoData.v_Ip_Map.size()); - System.out.println(BaseArangoData.e_Fqdn_Address_Ip_Map.size()); - System.out.println(BaseArangoData.e_Ip_Visit_Fqdn_Map.size()); - - ArangoDBConnect.clean(); } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java b/ip-learning-java-test/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java deleted file mode 100644 index b913e6b..0000000 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java +++ /dev/null @@ -1,95 +0,0 @@ -package cn.ac.iie.test; - -import cn.ac.iie.config.ApplicationConfig; -import com.arangodb.ArangoCursor; -import com.arangodb.ArangoDB; -import com.arangodb.ArangoDatabase; -import com.arangodb.entity.BaseDocument; -import com.arangodb.model.AqlQueryOptions; -import com.arangodb.util.MapBuilder; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -public class ReadArangoDBThreadTest { - - private static ConcurrentHashMap fqdnMap = new ConcurrentHashMap(); - public static void main(String[] args) throws Exception { - final ArangoDB arangoDB = new ArangoDB.Builder() - .maxConnections(ApplicationConfig.THREAD_POOL_NUMBER) - .host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT) - .user(ApplicationConfig.ARANGODB_USER) - .password(ApplicationConfig.ARANGODB_PASSWORD) - .build(); - Map bindVars = new MapBuilder().get(); - AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); - - String sql = "LET FQDN = (FOR doc IN V_FQDN RETURN doc) return {max_time:MAX(FQDN[*].FQDN_FIRST_FOUND_TIME),min_time:MIN(FQDN[*].FQDN_FIRST_FOUND_TIME)}"; -// String sql = "LET IP = (FOR doc IN V_IP RETURN doc) return {max_time:MAX(IP[*].FIRST_FOUND_TIME),min_time:MIN(IP[*].FIRST_FOUND_TIME)}"; - final ArangoDatabase db = arangoDB.db("insert_iplearn_index"); - long startTime = System.currentTimeMillis(); - ArangoCursor timeDoc = db.query(sql, bindVars, options, BaseDocument.class); - long maxTime =0L; - long minTime =0L; - while (timeDoc.hasNext()){ - BaseDocument doc = timeDoc.next(); - maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER; - minTime = Long.parseLong(doc.getAttribute("min_time").toString()); - } - long lastTime = System.currentTimeMillis(); - System.out.println("查询最大最小时间用时:"+(lastTime-startTime)); - System.out.println(maxTime + "--" + minTime); - final long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; - ExecutorService pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER); - - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { -// long finalMaxTime = maxTime; - final long finalMinTime = minTime; - pool.execute(new Runnable() { - - public void run() { - String name = Thread.currentThread().getName(); - ArangoDatabase insert_iplearn_index = arangoDB.db("insert_iplearn_index"); - Map bindVars = new MapBuilder().get(); - AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); - String[] split = name.split("-"); - Long threadNum = Long.parseLong(split[3]); - long maxThreadTime = finalMinTime + threadNum * diffTime; - long minThreadTime = finalMinTime + (threadNum-1)*diffTime; - String query = "FOR doc IN V_FQDN filter doc.FQDN_FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FQDN_FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; -// String query = "FOR doc IN V_IP filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; - System.out.println(name+":"+query); - long s = System.currentTimeMillis(); - ArangoCursor fqdnDoc = insert_iplearn_index.query(query, bindVars, options, BaseDocument.class); - List baseDocuments = fqdnDoc.asListRemaining(); - int i = 0; - for (BaseDocument doc:baseDocuments){ - String key = doc.getKey(); -// System.out.println(name+":"+key); - fqdnMap.put(key,doc); - i++; - } - /* - while (fqdnDoc.hasNext()){ - BaseDocument doc = fqdnDoc.next(); - } - */ - System.out.println(name+":"+ i); - long l = System.currentTimeMillis(); - System.out.println(name+"运行时间:"+(l-s)); - } - }); - } - pool.shutdown(); - while (!pool.awaitTermination(20, TimeUnit.SECONDS)){ - - } - System.out.println(fqdnMap.size()); - arangoDB.shutdown(); - - } -} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index ddd0500..b2ce9ba 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -43,7 +43,7 @@ public class ArangoDBConnect { return arangoDB.db(ApplicationConfig.ARANGODB_DB_NAME); } - public static void clean(){ + public void clean(){ try { if (arangoDB != null){ arangoDB.shutdown(); diff --git a/ip-learning-java-test/src/main/resources/application.properties b/ip-learning-java-test/src/main/resources/application.properties index 1b22db2..96c5b3b 100644 --- a/ip-learning-java-test/src/main/resources/application.properties +++ b/ip-learning-java-test/src/main/resources/application.properties @@ -1,5 +1,5 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.182 +arangoDB.host=192.168.40.127 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111 @@ -12,5 +12,5 @@ update.arango.batch=10000 thread.pool.number=5 thread.await.termination.time=10 -read.clickhouse.max.time=1571241660 -read.clickhouse.min.time=1571241600 \ No newline at end of file +read.clickhouse.max.time=1594809098 +read.clickhouse.min.time=1593792000 \ No newline at end of file diff --git a/ip-learning-java-test/src/main/resources/clickhouse.properties b/ip-learning-java-test/src/main/resources/clickhouse.properties index c5d89ac..00ebd01 100644 --- a/ip-learning-java-test/src/main/resources/clickhouse.properties +++ b/ip-learning-java-test/src/main/resources/clickhouse.properties @@ -1,5 +1,6 @@ drivers=ru.yandex.clickhouse.ClickHouseDriver -db.id=192.168.40.193:8123/av_miner?socket_timeout=300000 +#db.id=192.168.40.193:8123/av_miner?socket_timeout=300000 +db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000 mdb.user=default mdb.password=111111 initialsize=1 diff --git a/ip-learning-java-test/src/main/resources/log4j.properties b/ip-learning-java-test/src/main/resources/log4j.properties new file mode 100644 index 0000000..21cea3d --- /dev/null +++ b/ip-learning-java-test/src/main/resources/log4j.properties @@ -0,0 +1,24 @@ +######################### logger ############################## +log4j.logger.org.apache.http=OFF +log4j.logger.org.apache.http.wire=OFF + +#Log4j +log4j.rootLogger=info,console,file +# ̨־ +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# ļ־ +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#··زӦĿ +#log4j.appender.file.file=/home/ceiec/iplearning/logs/ip-learning-application.log +log4j.appender.file.file=./logs/ip-learning-application.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n diff --git a/ip-learning-java-test/src/test/java/cn/ac/iie/TestArango.java b/ip-learning-java-test/src/test/java/cn/ac/iie/TestArango.java index 6893133..be52053 100644 --- a/ip-learning-java-test/src/test/java/cn/ac/iie/TestArango.java +++ b/ip-learning-java-test/src/test/java/cn/ac/iie/TestArango.java @@ -16,6 +16,6 @@ public class TestArango { BaseEdgeDocument next = baseEdgeDocuments.next(); System.out.println(next.toString()); } - ArangoDBConnect.clean(); +// ArangoDBConnect.clean(); } }