From 2c9ff1aa3c7394b5e6a3e10154b6c6b1d8814a8d Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Sun, 28 Jun 2020 18:29:39 +0800 Subject: [PATCH] first commit --- ip-learning-java-test/.gitignore | 8 + ip-learning-java-test/pom.xml | 77 ++++++ .../cn/ac/iie/config/ApplicationConfig.java | 25 ++ .../java/cn/ac/iie/dao/BaseArangoData.java | 87 ++++++ .../cn/ac/iie/dao/BaseClickhouseData.java | 258 ++++++++++++++++++ .../ac/iie/etl/ArangoEFqdnAddressIpToMap.java | 49 ++++ .../ac/iie/etl/ArangoEIpVisitFqdnToMap.java | 49 ++++ .../java/cn/ac/iie/etl/ArangoVFqdnToMap.java | 50 ++++ .../java/cn/ac/iie/etl/ArangoVIpToMap.java | 48 ++++ .../cn/ac/iie/etl/UpdateEFqdnAddressIp.java | 58 ++++ .../cn/ac/iie/etl/UpdateEIpVisitFqdn.java | 58 ++++ .../java/cn/ac/iie/etl/UpdateGraphsData.java | 214 +++++++++++++++ .../main/java/cn/ac/iie/etl/UpdateVFqdn.java | 60 ++++ .../main/java/cn/ac/iie/etl/UpdateVIP.java | 60 ++++ .../iie/test/IpLearningApplicationTest.java | 78 ++++++ .../ac/iie/test/ReadArangoDBThreadTest.java | 95 +++++++ .../java/cn/ac/iie/utils/ArangoDBConnect.java | 84 ++++++ .../cn/ac/iie/utils/ClickhouseConnect.java | 103 +++++++ .../java/cn/ac/iie/utils/ConfigUtils.java | 36 +++ .../cn/ac/iie/utils/ExecutorThreadPool.java | 55 ++++ .../src/main/resources/application.properties | 16 ++ .../src/main/resources/clickhouse.properties | 7 + 22 files changed, 1575 insertions(+) create mode 100644 ip-learning-java-test/.gitignore create mode 100644 ip-learning-java-test/pom.xml create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/config/ApplicationConfig.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateGraphsData.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVIP.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/utils/ConfigUtils.java create mode 100644 ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java create mode 100644 ip-learning-java-test/src/main/resources/application.properties create mode 100644 ip-learning-java-test/src/main/resources/clickhouse.properties diff --git a/ip-learning-java-test/.gitignore b/ip-learning-java-test/.gitignore new file mode 100644 index 0000000..ac0cafa --- /dev/null +++ b/ip-learning-java-test/.gitignore @@ -0,0 +1,8 @@ +# Created by .ignore support plugin (hsz.mobi) +### Example user template template +### Example user template + +# IntelliJ project files +.idea +*.iml +target diff --git a/ip-learning-java-test/pom.xml b/ip-learning-java-test/pom.xml new file mode 100644 index 0000000..73c4361 --- /dev/null +++ b/ip-learning-java-test/pom.xml @@ -0,0 +1,77 @@ + + + 4.0.0 + + cn.ac.iie + ip-learning-java-test + 1.0-SNAPSHOT + + + + + ru.yandex.clickhouse + clickhouse-jdbc + 0.2.4 + + + + com.alibaba + druid + 1.1.10 + + + + com.typesafe + config + 1.2.1 + + + + com.arangodb + arangodb-java-driver + 4.2.2 + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.6 + + + + cn.ac.iie.test.IpLearningApplicationTest + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + + + \ No newline at end of file diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/config/ApplicationConfig.java b/ip-learning-java-test/src/main/java/cn/ac/iie/config/ApplicationConfig.java new file mode 100644 index 0000000..d559d07 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/config/ApplicationConfig.java @@ -0,0 +1,25 @@ +package cn.ac.iie.config; + + +import cn.ac.iie.utils.ConfigUtils; + +public class ApplicationConfig { + + public static final String ARANGODB_HOST = ConfigUtils.getStringProperty( "arangoDB.host"); + public static final Integer ARANGODB_PORT = ConfigUtils.getIntProperty("arangoDB.port"); + public static final String ARANGODB_USER = ConfigUtils.getStringProperty( "arangoDB.user"); + public static final String ARANGODB_PASSWORD = ConfigUtils.getStringProperty( "arangoDB.password"); + public static final String ARANGODB_DB_NAME = ConfigUtils.getStringProperty( "arangoDB.DB.name"); + public static final Integer ARANGODB_TTL = ConfigUtils.getIntProperty( "arangoDB.ttl"); + public static final Integer ARANGODB_BATCH = ConfigUtils.getIntProperty( "arangoDB.batch"); + + public static final Integer UPDATE_ARANGO_BATCH =ConfigUtils.getIntProperty("update.arango.batch"); + + public static final Integer THREAD_POOL_NUMBER = ConfigUtils.getIntProperty( "thread.pool.number"); + public static final Integer THREAD_AWAIT_TERMINATION_TIME = ConfigUtils.getIntProperty( "thread.await.termination.time"); + + public static final Long READ_CLICKHOUSE_MAX_TIME = ConfigUtils.getLongProperty("read.clickhouse.max.time"); + public static final Long READ_CLICKHOUSE_MIN_TIME = ConfigUtils.getLongProperty("read.clickhouse.min.time"); + + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java new file mode 100644 index 0000000..1be006a --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -0,0 +1,87 @@ +package cn.ac.iie.dao; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.etl.ArangoEFqdnAddressIpToMap; +import cn.ac.iie.etl.ArangoEIpVisitFqdnToMap; +import cn.ac.iie.etl.ArangoVFqdnToMap; +import cn.ac.iie.etl.ArangoVIpToMap; +import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ExecutorThreadPool; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.concurrent.ConcurrentHashMap; + +public class BaseArangoData { + public static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>(); + + private static final ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); + + private static final ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); + + public static void BaseVFqdnDataMap() { + String sql = "LET FQDN = (FOR doc IN V_FQDN RETURN doc) return {max_time:MAX(FQDN[*].FIRST_FOUND_TIME),min_time:MIN(FQDN[*].FIRST_FOUND_TIME)}"; + long[] timeLimit = getTimeLimit(sql); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + ArangoVFqdnToMap ArangoVFqdnToMap = new ArangoVFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i); + threadPool.executor(ArangoVFqdnToMap); + } + } + + public static void BaseVIpDataMap() { + String sql = "LET IP = (FOR doc IN V_IP RETURN doc) return {max_time:MAX(IP[*].FIRST_FOUND_TIME),min_time:MIN(IP[*].FIRST_FOUND_TIME)}"; + long[] timeLimit = getTimeLimit(sql); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + ArangoVIpToMap arangoVIpToMap = new ArangoVIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i); + threadPool.executor(arangoVIpToMap); + } + } + + public static void BaseEFqdnAddressIpDataMap(){ + String sql = "LET e = (FOR doc IN E_ADDRESS_V_FQDN_TO_V_IP RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}"; + long[] timeLimit = getTimeLimit(sql); + for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ + ArangoEFqdnAddressIpToMap arangoEFqdnAddressIpToMap = new ArangoEFqdnAddressIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i); + threadPool.executor(arangoEFqdnAddressIpToMap); + } + } + + public static void BaseEIpVisitFqdnDataMap(){ + String sql = "LET e = (FOR doc IN E_VISIT_V_IP_TO_V_FQDN RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}"; + long[] timeLimit = getTimeLimit(sql); + for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ + ArangoEIpVisitFqdnToMap arangoEIpVisitFqdnToMap = new ArangoEIpVisitFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i); + threadPool.executor(arangoEIpVisitFqdnToMap); + } + } + + private static long[] getTimeLimit(String sql) { + long minTime = 0L; + long maxTime = 0L; + long diffTime = 0L; + long startTime = System.currentTimeMillis(); + ArangoCursor timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class); + try { + if (timeDoc != null){ + while (timeDoc.hasNext()) { + BaseDocument doc = timeDoc.next(); + maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER; + minTime = Long.parseLong(doc.getAttribute("min_time").toString()); + } + long lastTime = System.currentTimeMillis(); + System.out.println("查询最大最小时间用时:" + (lastTime - startTime)); + diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; + }else { + System.out.println("获取最大最小时间异常"); + } + }catch (Exception e){ + e.printStackTrace(); + } + return new long[]{minTime, maxTime, diffTime}; + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java new file mode 100644 index 0000000..10dce04 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -0,0 +1,258 @@ +package cn.ac.iie.dao; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.etl.UpdateEFqdnAddressIp; +import cn.ac.iie.etl.UpdateEIpVisitFqdn; +import cn.ac.iie.etl.UpdateVFqdn; +import cn.ac.iie.etl.UpdateVIP; +import cn.ac.iie.utils.ClickhouseConnect; +import com.alibaba.druid.pool.DruidPooledConnection; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashMap; + +public class BaseClickhouseData { + private static final ClickhouseConnect manger = ClickhouseConnect.getInstance(); + private static HashMap> vFqdnMap = new HashMap<>(); + private static HashMap> vIpMap = new HashMap<>(); + private static HashMap> eFqdnAddressIpMap = new HashMap<>(); + private static HashMap> eIpVisitFqdnMap = new HashMap<>(); + public Connection connection; + public Statement pstm; + + public BaseClickhouseData(){} + + public ResultSet BaseRealTimeVFqdn(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" and media_domain != '' "; + String sql = "SELECT media_domain AS FQDN_NAME,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT( * ) AS FQDN_COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY media_domain"; + System.out.println(sql); + return manger.executorQuery(sql,connection,pstm); + } + + public ResultSet BaseRealTimeVIp(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " recv_time >= "+minTime+" and recv_time <= "+maxTime; + String sql = "SELECT IP,location,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT(*) AS IP_COUNT_TOTAL FROM(( SELECT s1_s_ip AS IP, s1_s_location_region AS location, recv_time FROM media_expire_patch where "+where+" ) UNION ALL ( SELECT s1_d_ip AS IP, s1_d_location_region AS location, recv_time FROM media_expire_patch where "+where+" )) GROUP BY IP,location"; + System.out.println(sql); + return manger.executorQuery(sql,connection,pstm); + } + + public ResultSet BaseReadTimeEFqdnAddressIp(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND media_domain != '' AND s1_d_ip != '' "; + String sql = "SELECT media_domain AS V_FQDN,s1_d_ip AS V_IP,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_d_ip,media_domain"; + System.out.println(sql); + return manger.executorQuery(sql,connection,pstm); + } + + public ResultSet BaseRealTimeEIpVisitFqdn(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" AND s1_s_ip != '' AND media_domain != '' "; + String sql = "SELECT s1_s_ip AS V_IP,media_domain AS V_FQDN,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_s_ip,media_domain"; + System.out.println(sql); + return manger.executorQuery(sql,connection,pstm); + } + + private static long[] getTimeLimit(){ + long maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; + long minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; + return new long[]{maxTime,minTime}; + } + + static { + for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ + vFqdnMap.put(i,new HashMap()); + } + System.out.println("V_FQDN resultMap初始化完成"); + for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ + vIpMap.put(i,new HashMap()); + } + System.out.println("V_IP resultMap初始化完成"); + for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ + eFqdnAddressIpMap.put(i,new HashMap()); + } + System.out.println("E_ADDRESS_V_FQDN_TO_V_IP resultMap初始化完成"); + for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ + eIpVisitFqdnMap.put(i,new HashMap()); + } + System.out.println("E_VISIT_V_IP_TO_V_FQDN resultMap初始化完成"); + } + + public static void BaseVFqdn(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" and media_domain != '' "; + String sql = "SELECT media_domain AS FQDN_NAME,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT( * ) AS FQDN_COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY media_domain"; + System.out.println(sql); + long start = System.currentTimeMillis(); + try { + DruidPooledConnection connection = manger.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()){ + String fqdnName = resultSet.getString("FQDN_NAME"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long fqdnCountTotal = resultSet.getLong("FQDN_COUNT_TOTAL"); + BaseDocument newDoc = new BaseDocument(); + newDoc.setKey(fqdnName); + newDoc.addAttribute("FQDN_NAME",fqdnName); + newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); + newDoc.addAttribute("FQDN_COUNT_TOTAL",fqdnCountTotal); + int i = fqdnName.hashCode() % ApplicationConfig.THREAD_POOL_NUMBER; + HashMap documentHashMap = vFqdnMap.getOrDefault(i, new HashMap()); + documentHashMap.put(fqdnName,newDoc); + } + long last = System.currentTimeMillis(); + System.out.println("读取clickhouse v_FQDN时间:"+(last - start)); + for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ + HashMap baseDocumentHashMap = vFqdnMap.get(i); + UpdateVFqdn updateVFqdn = new UpdateVFqdn(baseDocumentHashMap); + updateVFqdn.run(); + } + }catch (Exception e){ + e.printStackTrace(); + } + } + + public static void BaseVIp(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = " recv_time >= "+minTime+" and recv_time <= "+maxTime; + String sql = "SELECT IP,location,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT(*) AS IP_COUNT_TOTAL FROM(( SELECT s1_s_ip AS IP, s1_s_location_region AS location, recv_time FROM media_expire_patch where "+where+" ) UNION ALL ( SELECT s1_d_ip AS IP, s1_d_location_region AS location, recv_time FROM media_expire_patch where "+where+" )) GROUP BY IP,location"; + System.out.println(sql); + long start = System.currentTimeMillis(); + try { + DruidPooledConnection connection = manger.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()){ + String ip = resultSet.getString("IP"); + String location = resultSet.getString("location"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long ipCountTotal = resultSet.getLong("IP_COUNT_TOTAL"); + BaseDocument newDoc = new BaseDocument(); + newDoc.setKey(ip); + newDoc.addAttribute("IP",ip); + newDoc.addAttribute("IP_LOCATION",location); + newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); + newDoc.addAttribute("IP_COUNT_TOTAL",ipCountTotal); + int i = ip.hashCode() % ApplicationConfig.THREAD_POOL_NUMBER; + HashMap documentHashMap = vIpMap.getOrDefault(i, new HashMap()); + documentHashMap.put(ip,newDoc); + } + long last = System.currentTimeMillis(); + System.out.println("读取clickhouse v_IP时间:"+(last - start)); + for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ + HashMap baseDocumentHashMap = vIpMap.get(i); + UpdateVIP updateVIp = new UpdateVIP(baseDocumentHashMap); + updateVIp.run(); + } + }catch (Exception e){ + e.printStackTrace(); + } + } + + public static void BaseEFqdnAddressIp(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND media_domain != '' AND s1_d_ip != '' "; + String sql = "SELECT media_domain AS V_FQDN,s1_d_ip AS V_IP,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_d_ip,media_domain"; + System.out.println(sql); + long start = System.currentTimeMillis(); + try { + DruidPooledConnection connection = manger.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()){ + String vFqdn = resultSet.getString("V_FQDN"); + String vIp = resultSet.getString("V_IP"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String key = vFqdn+"-"+vIp; + BaseEdgeDocument newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("V_FQDN/"+vFqdn); + newDoc.setTo("V_IP/"+vIp); + newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL",countTotal); + int i = key.hashCode() % ApplicationConfig.THREAD_POOL_NUMBER; + HashMap documentHashMap = eFqdnAddressIpMap.getOrDefault(i, new HashMap()); + documentHashMap.put(key,newDoc); + } + long last = System.currentTimeMillis(); + System.out.println("读取clickhouse EFqdnAddressIp时间:"+(last - start)); + for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ + HashMap baseDocumentHashMap = eFqdnAddressIpMap.get(i); + UpdateEFqdnAddressIp updateEFqdnAddressIp = new UpdateEFqdnAddressIp(baseDocumentHashMap); + updateEFqdnAddressIp.run(); + } + }catch (Exception e){ + e.printStackTrace(); + } + } + + public static void BaseEIpVisitFqdn(){ + long[] timeLimit = getTimeLimit(); + long maxTime = timeLimit[0]; + long minTime = timeLimit[1]; + String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" AND s1_s_ip != '' AND media_domain != '' "; + String sql = "SELECT s1_s_ip AS V_IP,media_domain AS V_FQDN,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_s_ip,media_domain"; + System.out.println(sql); + long start = System.currentTimeMillis(); + try { + DruidPooledConnection connection = manger.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()){ + String vIp = resultSet.getString("V_IP"); + String vFqdn = resultSet.getString("V_FQDN"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String key = vIp +"-"+ vFqdn; + BaseEdgeDocument newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("V_IP/"+vIp); + newDoc.setTo("V_FQDN/"+vFqdn); + newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL",countTotal); + int i = key.hashCode() % ApplicationConfig.THREAD_POOL_NUMBER; + HashMap documentHashMap = eIpVisitFqdnMap.getOrDefault(i, new HashMap()); + documentHashMap.put(key,newDoc); + } + long last = System.currentTimeMillis(); + System.out.println("读取clickhouse EIpVisitFqdn时间:"+(last - start)); + for (int i = 0;i < ApplicationConfig.THREAD_POOL_NUMBER;i++){ + HashMap baseDocumentHashMap = eIpVisitFqdnMap.get(i); + UpdateEIpVisitFqdn updateEIpVisitFqdn = new UpdateEIpVisitFqdn(baseDocumentHashMap); + updateEIpVisitFqdn.run(); + } + }catch (Exception e){ + e.printStackTrace(); + } + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java new file mode 100644 index 0000000..a2ca34b --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java @@ -0,0 +1,49 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.List; + +public class ArangoEFqdnAddressIpToMap implements Runnable{ + + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + private ArangoEFqdnAddressIpToMap(){} + + public ArangoEFqdnAddressIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN E_ADDRESS_V_FQDN_TO_V_IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; + System.out.println(name + ":" + query); + long s = System.currentTimeMillis(); + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseEdgeDocument doc : baseDocuments) { + String key = doc.getKey(); + BaseArangoData.e_Fqdn_Address_Ip_Map.put(key, doc); + i++; + } + System.out.println(name + ":共处理数据" + i); + long l = System.currentTimeMillis(); + System.out.println(name + "运行时间:" + (l - s)); + }else { + System.out.println("查询异常"); + } + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java new file mode 100644 index 0000000..0bf7ca9 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java @@ -0,0 +1,49 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.List; + +public class ArangoEIpVisitFqdnToMap implements Runnable { + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + private ArangoEIpVisitFqdnToMap(){} + + public ArangoEIpVisitFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN E_VISIT_V_IP_TO_V_FQDN filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; + System.out.println(name + ":" + query); + long s = System.currentTimeMillis(); + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); + + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseEdgeDocument doc : baseDocuments) { + String key = doc.getKey(); + BaseArangoData.e_Ip_Visit_Fqdn_Map.put(key, doc); + i++; + } + System.out.println(name + ":共处理数据" + i); + long l = System.currentTimeMillis(); + System.out.println(name + "运行时间:" + (l - s)); + }else { + System.out.println("查询异常"); + } + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java new file mode 100644 index 0000000..bbbeb5d --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java @@ -0,0 +1,50 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; + +import java.util.List; + +public class ArangoVFqdnToMap implements Runnable { + + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + private ArangoVFqdnToMap(){} + + public ArangoVFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1)* diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN V_FQDN filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; + System.out.println(name+":"+query); + long s = System.currentTimeMillis(); + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); + + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseDocument doc:baseDocuments){ + String key = doc.getKey(); + BaseArangoData.v_Fqdn_Map.put(key,doc); + i++; + } + System.out.println(name+":共处理数据"+ i); + long l = System.currentTimeMillis(); + System.out.println(name+"运行时间:"+(l-s)); + }else { + System.out.println("查询异常"); + } + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java new file mode 100644 index 0000000..0f8fcb4 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java @@ -0,0 +1,48 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; + +import java.util.List; + +public class ArangoVIpToMap implements Runnable { + + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + private ArangoVIpToMap() {} + + public ArangoVIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN V_IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; + System.out.println(name + ":" + query); + long s = System.currentTimeMillis(); + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseDocument doc : baseDocuments) { + String key = doc.getKey(); + BaseArangoData.v_Ip_Map.put(key, doc); + i++; + } + System.out.println(name + ":共处理数据" + i); + long l = System.currentTimeMillis(); + System.out.println(name + "运行时间:" + (l - s)); + } + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java new file mode 100644 index 0000000..ce1a9ba --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java @@ -0,0 +1,58 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; + +public class UpdateEFqdnAddressIp implements Runnable { + private HashMap documentHashMap; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateEFqdnAddressIp(HashMap documentHashMap) { + this.documentHashMap = documentHashMap; + } + @Override + public void run() { + Set keySet = documentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (String key:keySet){ + BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); + if (newEdgeDocument != null){ + i += 1; + BaseEdgeDocument edgeDocument = BaseArangoData.e_Fqdn_Address_Ip_Map.getOrDefault(key, null); + if (edgeDocument != null){ + Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); + long countTotal = Long.parseLong(newEdgeDocument.getAttribute("COUNT_TOTAL").toString()); + long updateCountTotal = Long.parseLong(edgeDocument.getAttribute("COUNT_TOTAL").toString()); + edgeDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); + edgeDocument.addAttribute("COUNT_TOTAL",countTotal+updateCountTotal); + docInsert.add(edgeDocument); + }else { + docUpdate.add(newEdgeDocument); + } + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"E_ADDRESS_V_FQDN_TO_V_IP"); + System.out.println("更新"+i); + i = 0; + } + } + if (i != 0){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"E_ADDRESS_V_FQDN_TO_V_IP"); + System.out.println("更新"+i); + } + }catch (Exception e){ + e.printStackTrace(); + } + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java new file mode 100644 index 0000000..a07dadf --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java @@ -0,0 +1,58 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; + +public class UpdateEIpVisitFqdn implements Runnable { + private HashMap documentHashMap; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateEIpVisitFqdn(HashMap documentHashMap) { + this.documentHashMap = documentHashMap; + } + @Override + public void run() { + Set keySet = documentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (String key:keySet){ + BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); + if (newEdgeDocument != null){ + i += 1; + BaseEdgeDocument edgeDocument = BaseArangoData.e_Ip_Visit_Fqdn_Map.getOrDefault(key, null); + if (edgeDocument != null){ + Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); + long countTotal = Long.parseLong(newEdgeDocument.getAttribute("COUNT_TOTAL").toString()); + long updateCountTotal = Long.parseLong(edgeDocument.getAttribute("COUNT_TOTAL").toString()); + edgeDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); + edgeDocument.addAttribute("COUNT_TOTAL",countTotal+updateCountTotal); + docInsert.add(edgeDocument); + }else { + docUpdate.add(newEdgeDocument); + } + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"E_VISIT_V_IP_TO_V_FQDN"); + System.out.println("更新"+i); + i = 0; + } + } + if (i != 0){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"E_VISIT_V_IP_TO_V_FQDN"); + System.out.println("更新"+i); + } + }catch (Exception e){ + e.printStackTrace(); + } + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateGraphsData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateGraphsData.java new file mode 100644 index 0000000..c6e314a --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateGraphsData.java @@ -0,0 +1,214 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.dao.BaseClickhouseData; +import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ClickhouseConnect; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; + +import java.sql.ResultSet; +import java.util.ArrayList; + +public class UpdateGraphsData { + + private static final BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); + + private static final ClickhouseConnect clickhouseManger = ClickhouseConnect.getInstance(); + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public static void updateVFqdn(){ + ResultSet resultSet = baseClickhouseData.BaseRealTimeVFqdn(); + try { + System.out.println("读取clickhouse成功"); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + System.out.println("开始更新V_FQDN"); + while (resultSet.next()){ + i += 1; + String fqdnName = resultSet.getString("FQDN_NAME"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long fqdnCountTotal = resultSet.getLong("FQDN_COUNT_TOTAL"); + BaseDocument document = BaseArangoData.v_Fqdn_Map.getOrDefault(fqdnName, null); + if (document != null){ + long countTotal = Long.parseLong(document.getAttribute("FQDN_COUNT_TOTAL").toString()); + document.addAttribute("LAST_FOUND_TIME",lastFoundTime); + document.addAttribute("FQDN_COUNT_TOTAL",countTotal+fqdnCountTotal); + docUpdate.add(document); + }else { + BaseDocument newDoc = new BaseDocument(); + newDoc.setKey(fqdnName); + newDoc.addAttribute("FQDN_NAME",fqdnName); + newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); + newDoc.addAttribute("FQDN_COUNT_TOTAL",fqdnCountTotal); + docInsert.add(newDoc); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"V_FQDN"); + System.out.println("更新"+i); + i = 0; + } + } + if (i != 0){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"V_FQDN"); + System.out.println("更新"+i); + } + }catch (Exception e){ + e.printStackTrace(); + }finally { + clickhouseManger.clear(baseClickhouseData.pstm,baseClickhouseData.connection); + } + } + + public static void updateVIp(){ + ResultSet resultSet = baseClickhouseData.BaseRealTimeVIp(); + try { + System.out.println("读取clickhouse成功"); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + System.out.println("开始更新V_IP"); + while (resultSet.next()){ + i += 1; + String ip = resultSet.getString("IP"); + String location = resultSet.getString("location"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long ipCountTotal = resultSet.getLong("IP_COUNT_TOTAL"); + BaseDocument document = BaseArangoData.v_Ip_Map.getOrDefault(ip, null); + if (document != null){ + long countTotal = Long.parseLong(document.getAttribute("IP_COUNT_TOTAL").toString()); + document.addAttribute("LAST_FOUND_TIME",lastFoundTime); + document.addAttribute("IP_COUNT_TOTAL",countTotal+ipCountTotal); + docUpdate.add(document); + }else { + BaseDocument newDoc = new BaseDocument(); + newDoc.setKey(ip); + newDoc.addAttribute("IP",ip); + newDoc.addAttribute("IP_LOCATION",location); + newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); + newDoc.addAttribute("IP_COUNT_TOTAL",ipCountTotal); + docInsert.add(newDoc); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"V_IP"); + System.out.println("更新"+i); + i = 0; + } + } + if (i != 0){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"V_IP"); + System.out.println("更新"+i); + } + }catch (Exception e){ + e.printStackTrace(); + }finally { + clickhouseManger.clear(baseClickhouseData.pstm,baseClickhouseData.connection); + } + } + + public static void updateEFqdnAddressIp(){ + ResultSet resultSet = baseClickhouseData.BaseReadTimeEFqdnAddressIp(); + try { + System.out.println("读取clickhouse成功"); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + System.out.println("开始更新E_ADDRESS_V_FQDN_TO_V_IP"); + while (resultSet.next()){ + i += 1; + String vFqdn = resultSet.getString("V_FQDN"); + String vIp = resultSet.getString("V_IP"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String key = vFqdn+"-"+vIp; + BaseEdgeDocument edgeDocument = BaseArangoData.e_Fqdn_Address_Ip_Map.getOrDefault(key, null); + if (edgeDocument != null){ + long updateCountTotal = Long.parseLong(edgeDocument.getAttribute("COUNT_TOTAL").toString()); + edgeDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); + edgeDocument.addAttribute("COUNT_TOTAL",countTotal+updateCountTotal); + docUpdate.add(edgeDocument); + }else { + BaseEdgeDocument newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("V_FQDN/"+vFqdn); + newDoc.setTo("V_IP/"+vIp); + newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL",countTotal); + docInsert.add(newDoc); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"E_ADDRESS_V_FQDN_TO_V_IP"); + System.out.println("更新"+i); + i = 0; + } + } + if (i != 0){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"E_ADDRESS_V_FQDN_TO_V_IP"); + System.out.println("更新"+i); + } + }catch (Exception e){ + e.printStackTrace(); + }finally { + clickhouseManger.clear(baseClickhouseData.pstm,baseClickhouseData.connection); + } + } + + public static void updateEIpVisitFqdn(){ + ResultSet resultSet = baseClickhouseData.BaseRealTimeEIpVisitFqdn(); + try { + System.out.println("读取clickhouse成功"); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + System.out.println("开始更新E_VISIT_V_IP_TO_V_FQDN"); + while (resultSet.next()){ + i += 1; + String vIp = resultSet.getString("V_IP"); + String vFqdn = resultSet.getString("V_FQDN"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String key = vIp +"-"+ vFqdn; + BaseEdgeDocument edgeDocument = BaseArangoData.e_Ip_Visit_Fqdn_Map.getOrDefault(key, null); + if (edgeDocument != null){ + long updateCountTotal = Long.parseLong(edgeDocument.getAttribute("COUNT_TOTAL").toString()); + edgeDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); + edgeDocument.addAttribute("COUNT_TOTAL",countTotal+updateCountTotal); + docUpdate.add(edgeDocument); + }else { + BaseEdgeDocument newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("V_IP/"+vIp); + newDoc.setTo("V_FQDN/"+vFqdn); + newDoc.addAttribute("FIRST_FOUND_TIME",firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME",lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL",countTotal); + docInsert.add(newDoc); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"E_VISIT_V_IP_TO_V_FQDN"); + System.out.println("更新"+i); + i = 0; + } + } + if (i != 0){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"E_VISIT_V_IP_TO_V_FQDN"); + System.out.println("更新"+i); + } + }catch (Exception e){ + e.printStackTrace(); + }finally { + clickhouseManger.clear(baseClickhouseData.pstm,baseClickhouseData.connection); + } + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java new file mode 100644 index 0000000..eba800f --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java @@ -0,0 +1,60 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; + +public class UpdateVFqdn implements Runnable{ + + private HashMap documentHashMap; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateVFqdn(HashMap documentHashMap) { + this.documentHashMap = documentHashMap; + } + + @Override + public void run() { + Set keySet = documentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (String key:keySet){ + BaseDocument newDocument = documentHashMap.getOrDefault(key, null); + if (newDocument != null){ + i += 1; + BaseDocument document = BaseArangoData.v_Fqdn_Map.getOrDefault(key, null); + if (document != null){ + Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); + long fqdnCountTotal = Long.parseLong(newDocument.getAttribute("FQDN_COUNT_TOTAL").toString()); + long countTotal = Long.parseLong(document.getAttribute("FQDN_COUNT_TOTAL").toString()); + document.addAttribute("LAST_FOUND_TIME",lastFoundTime); + document.addAttribute("FQDN_COUNT_TOTAL",countTotal+fqdnCountTotal); + docUpdate.add(document); + }else { + docInsert.add(newDocument); + } + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"V_FQDN"); + System.out.println("更新"+i); + i = 0; + } + } + if (i != 0){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"V_FQDN"); + System.out.println("更新"+i); + } + }catch (Exception e){ + e.printStackTrace(); + } + + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVIP.java b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVIP.java new file mode 100644 index 0000000..12a906a --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/etl/UpdateVIP.java @@ -0,0 +1,60 @@ +package cn.ac.iie.etl; + + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; + +public class UpdateVIP implements Runnable { + + private HashMap documentHashMap; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateVIP(HashMap documentHashMap) { + this.documentHashMap = documentHashMap; + } + + @Override + public void run() { + Set keySet = documentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (String key:keySet){ + BaseDocument newDocument = documentHashMap.getOrDefault(key, null); + if (newDocument != null){ + i += 1; + BaseDocument document = BaseArangoData.v_Ip_Map.getOrDefault(key, null); + if (document != null){ + Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); + long ipCountTotal = Long.parseLong(newDocument.getAttribute("IP_COUNT_TOTAL").toString()); + long countTotal = Long.parseLong(document.getAttribute("IP_COUNT_TOTAL").toString()); + document.addAttribute("LAST_FOUND_TIME",lastFoundTime); + document.addAttribute("IP_COUNT_TOTAL",countTotal+ipCountTotal); + docUpdate.add(document); + }else { + docInsert.add(newDocument); + } + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"V_IP"); + System.out.println("更新"+i); + i = 0; + } + } + if (i != 0){ + arangoManger.insertAndUpdate(docInsert,docUpdate,"V_IP"); + System.out.println("更新"+i); + } + }catch (Exception e){ + e.printStackTrace(); + } + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java new file mode 100644 index 0000000..8b6bf21 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java @@ -0,0 +1,78 @@ +package cn.ac.iie.test; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.dao.BaseClickhouseData; +import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ExecutorThreadPool; + +import java.util.concurrent.CountDownLatch; + +public class IpLearningApplicationTest { + + public static void main(String[] args) { + long startA = System.currentTimeMillis(); +// BaseArangoData.BaseVFqdnDataMap(); +// BaseArangoData.BaseVIpDataMap(); +// BaseArangoData.BaseEFqdnAddressIpDataMap(); +// BaseArangoData.BaseEIpVisitFqdnDataMap(); + + +// ExecutorThreadPool.shutdown(); +// ExecutorThreadPool.awaitThreadTask(); + long lastA = System.currentTimeMillis(); + System.out.println("读取ArangoDb时间:"+(lastA - startA)); + +// UpdateGraphsData.updateVFqdn(); +// UpdateGraphsData.updateVIp(); +// UpdateGraphsData.updateEFqdnAddressIp(); +// UpdateGraphsData.updateEIpVisitFqdn(); + + long startC = System.currentTimeMillis(); + CountDownLatch countDownLatch = new CountDownLatch(4); + new Thread(new Runnable() { + @Override + public void run() { + BaseClickhouseData.BaseVFqdn(); + countDownLatch.countDown(); + } + }).start(); + + new Thread(new Runnable() { + @Override + public void run() { + BaseClickhouseData.BaseVIp(); + countDownLatch.countDown(); + } + }).start(); + + new Thread(new Runnable() { + @Override + public void run() { + BaseClickhouseData.BaseEFqdnAddressIp(); + countDownLatch.countDown(); + } + }).start(); + + new Thread(new Runnable() { + @Override + public void run() { + BaseClickhouseData.BaseEIpVisitFqdn(); + countDownLatch.countDown(); + } + }).start(); + try { + countDownLatch.await(); + }catch (Exception e){ + e.printStackTrace(); + } + long lastC = System.currentTimeMillis(); + System.out.println("更新ArangoDb时间:"+(lastC - startC)); + + System.out.println(BaseArangoData.v_Fqdn_Map.size()); + System.out.println(BaseArangoData.v_Ip_Map.size()); + System.out.println(BaseArangoData.e_Fqdn_Address_Ip_Map.size()); + System.out.println(BaseArangoData.e_Ip_Visit_Fqdn_Map.size()); + + ArangoDBConnect.clean(); + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java b/ip-learning-java-test/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java new file mode 100644 index 0000000..b913e6b --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java @@ -0,0 +1,95 @@ +package cn.ac.iie.test; + +import cn.ac.iie.config.ApplicationConfig; +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoDB; +import com.arangodb.ArangoDatabase; +import com.arangodb.entity.BaseDocument; +import com.arangodb.model.AqlQueryOptions; +import com.arangodb.util.MapBuilder; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class ReadArangoDBThreadTest { + + private static ConcurrentHashMap fqdnMap = new ConcurrentHashMap(); + public static void main(String[] args) throws Exception { + final ArangoDB arangoDB = new ArangoDB.Builder() + .maxConnections(ApplicationConfig.THREAD_POOL_NUMBER) + .host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT) + .user(ApplicationConfig.ARANGODB_USER) + .password(ApplicationConfig.ARANGODB_PASSWORD) + .build(); + Map bindVars = new MapBuilder().get(); + AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); + + String sql = "LET FQDN = (FOR doc IN V_FQDN RETURN doc) return {max_time:MAX(FQDN[*].FQDN_FIRST_FOUND_TIME),min_time:MIN(FQDN[*].FQDN_FIRST_FOUND_TIME)}"; +// String sql = "LET IP = (FOR doc IN V_IP RETURN doc) return {max_time:MAX(IP[*].FIRST_FOUND_TIME),min_time:MIN(IP[*].FIRST_FOUND_TIME)}"; + final ArangoDatabase db = arangoDB.db("insert_iplearn_index"); + long startTime = System.currentTimeMillis(); + ArangoCursor timeDoc = db.query(sql, bindVars, options, BaseDocument.class); + long maxTime =0L; + long minTime =0L; + while (timeDoc.hasNext()){ + BaseDocument doc = timeDoc.next(); + maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER; + minTime = Long.parseLong(doc.getAttribute("min_time").toString()); + } + long lastTime = System.currentTimeMillis(); + System.out.println("查询最大最小时间用时:"+(lastTime-startTime)); + System.out.println(maxTime + "--" + minTime); + final long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; + ExecutorService pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER); + + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { +// long finalMaxTime = maxTime; + final long finalMinTime = minTime; + pool.execute(new Runnable() { + + public void run() { + String name = Thread.currentThread().getName(); + ArangoDatabase insert_iplearn_index = arangoDB.db("insert_iplearn_index"); + Map bindVars = new MapBuilder().get(); + AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); + String[] split = name.split("-"); + Long threadNum = Long.parseLong(split[3]); + long maxThreadTime = finalMinTime + threadNum * diffTime; + long minThreadTime = finalMinTime + (threadNum-1)*diffTime; + String query = "FOR doc IN V_FQDN filter doc.FQDN_FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FQDN_FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; +// String query = "FOR doc IN V_IP filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; + System.out.println(name+":"+query); + long s = System.currentTimeMillis(); + ArangoCursor fqdnDoc = insert_iplearn_index.query(query, bindVars, options, BaseDocument.class); + List baseDocuments = fqdnDoc.asListRemaining(); + int i = 0; + for (BaseDocument doc:baseDocuments){ + String key = doc.getKey(); +// System.out.println(name+":"+key); + fqdnMap.put(key,doc); + i++; + } + /* + while (fqdnDoc.hasNext()){ + BaseDocument doc = fqdnDoc.next(); + } + */ + System.out.println(name+":"+ i); + long l = System.currentTimeMillis(); + System.out.println(name+"运行时间:"+(l-s)); + } + }); + } + pool.shutdown(); + while (!pool.awaitTermination(20, TimeUnit.SECONDS)){ + + } + System.out.println(fqdnMap.size()); + arangoDB.shutdown(); + + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java new file mode 100644 index 0000000..f1c75f0 --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -0,0 +1,84 @@ +package cn.ac.iie.utils; + +import cn.ac.iie.config.ApplicationConfig; +import com.arangodb.ArangoCollection; +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoDB; +import com.arangodb.ArangoDatabase; +import com.arangodb.model.AqlQueryOptions; +import com.arangodb.util.MapBuilder; + +import java.util.ArrayList; +import java.util.Map; + +public class ArangoDBConnect { + private static ArangoDB arangoDB = null; + private static ArangoDBConnect conn = null; + static { + getArangoDB(); + } + + private static void getArangoDB(){ + arangoDB = new ArangoDB.Builder() + .maxConnections(ApplicationConfig.THREAD_POOL_NUMBER) + .host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT) + .user(ApplicationConfig.ARANGODB_USER) + .password(ApplicationConfig.ARANGODB_PASSWORD) + .build(); + } + + public static synchronized ArangoDBConnect getInstance(){ + if (null == conn){ + conn = new ArangoDBConnect(); + } + return conn; + } + + public ArangoDatabase getDatabase(){ + return arangoDB.db(ApplicationConfig.ARANGODB_DB_NAME); + } + + public static void clean(){ + try { + if (arangoDB != null){ + arangoDB.shutdown(); + } + }catch (Exception e){ + e.printStackTrace(); + } + } + + public ArangoCursor executorQuery(String query,Class type){ + ArangoDatabase database = getDatabase(); + Map bindVars = new MapBuilder().get(); + AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); + try { + return database.query(query, bindVars, options, type); + }catch (Exception e){ + e.printStackTrace(); + return null; + }finally { + bindVars.clear(); + } + } + + public void insertAndUpdate(ArrayList docInsert,ArrayList docUpdate,String collectionName){ + ArangoDatabase database = getDatabase(); + try { + ArangoCollection collection = database.collection(collectionName); + if (!docInsert.isEmpty()){ + collection.importDocuments(docInsert); + } + if (!docUpdate.isEmpty()){ + collection.replaceDocuments(docUpdate); + } + }catch (Exception e){ + System.out.println("更新失败"); + e.printStackTrace(); + }finally { + docInsert.clear(); + docInsert.clear(); + } + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java new file mode 100644 index 0000000..5bc7ade --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java @@ -0,0 +1,103 @@ +package cn.ac.iie.utils; + +import com.alibaba.druid.pool.DruidDataSource; +import com.alibaba.druid.pool.DruidPooledConnection; + +import java.sql.*; +import java.util.Properties; + +public class ClickhouseConnect { + private static DruidDataSource dataSource = null; + private static ClickhouseConnect dbConnect = null; + private static Properties props = new Properties(); + + static { + getDbConnect(); + } + + private static void getDbConnect() { + try { + if (dataSource == null) { + dataSource = new DruidDataSource(); + props.load(ClickhouseConnect.class.getClassLoader().getResourceAsStream("clickhouse.properties")); + //设置连接参数 + dataSource.setUrl("jdbc:clickhouse://" + props.getProperty("db.id")); + dataSource.setDriverClassName(props.getProperty("drivers")); + dataSource.setUsername(props.getProperty("mdb.user")); + dataSource.setPassword(props.getProperty("mdb.password")); + //配置初始化大小、最小、最大 + dataSource.setInitialSize(Integer.parseInt(props.getProperty("initialsize"))); + dataSource.setMinIdle(Integer.parseInt(props.getProperty("minidle"))); + dataSource.setMaxActive(Integer.parseInt(props.getProperty("maxactive"))); + //配置获取连接等待超时的时间 + dataSource.setMaxWait(30000); + //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + dataSource.setTimeBetweenEvictionRunsMillis(2000); + //防止过期 + dataSource.setValidationQuery("SELECT 1"); + dataSource.setTestWhileIdle(true); + dataSource.setTestOnBorrow(true); + dataSource.setKeepAlive(true); + } + } catch (Exception e) { + e.printStackTrace(); + + } + } + + /** + * 数据库连接池单例 + * + * @return dbConnect + */ + public static synchronized ClickhouseConnect getInstance() { + if (null == dbConnect) { + dbConnect = new ClickhouseConnect(); + } + return dbConnect; + } + + /** + * 返回druid数据库连接 + * + * @return 连接 + * @throws SQLException sql异常 + */ + public DruidPooledConnection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + /** + * 清空PreparedStatement、Connection对象,未定义的置空。 + * + * @param pstmt PreparedStatement对象 + * @param connection Connection对象 + */ + public void clear(Statement pstmt, Connection connection) { + try { + if (pstmt != null) { + pstmt.close(); + } + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + e.printStackTrace(); + } + + } + + public ResultSet executorQuery(String query,Connection connection,Statement pstm){ +// Connection connection = null; +// Statement pstm = null; + try { + connection = getConnection(); + pstm = connection.createStatement(); + return pstm.executeQuery(query); + }catch (Exception e){ + e.printStackTrace(); + return null; + } + } + +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ConfigUtils.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ConfigUtils.java new file mode 100644 index 0000000..726b3cf --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ConfigUtils.java @@ -0,0 +1,36 @@ +package cn.ac.iie.utils; + +import java.util.Properties; + +public class ConfigUtils { + private static Properties propCommon = new Properties(); + + public static String getStringProperty(String key) { + return propCommon.getProperty(key); + } + + + public static Integer getIntProperty(String key) { + return Integer.parseInt(propCommon.getProperty(key)); + } + + public static Long getLongProperty(String key) { + return Long.parseLong(propCommon.getProperty(key)); + } + + public static Boolean getBooleanProperty(String key) { + return "true".equals(propCommon.getProperty(key).toLowerCase().trim()); + } + + static { + try { + propCommon.load(ConfigUtils.class.getClassLoader().getResourceAsStream("application.properties")); + System.out.println("application.properties加载成功"); + + + } catch (Exception e) { + propCommon = null; + System.err.println("配置加载失败"); + } + } +} diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java new file mode 100644 index 0000000..ffd5b5a --- /dev/null +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java @@ -0,0 +1,55 @@ +package cn.ac.iie.utils; + +import cn.ac.iie.config.ApplicationConfig; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class ExecutorThreadPool { + private static ExecutorService pool = null ; + private static ExecutorThreadPool poolExecutor = null; + + static { + getThreadPool(); + } + + private static void getThreadPool(){ + pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER); + } + + public static ExecutorThreadPool getInstance(){ + if (null == poolExecutor){ + poolExecutor = new ExecutorThreadPool(); + } + return poolExecutor; + } + + public void executor(Runnable command){ + pool.execute(command); + } + + public static void awaitThreadTask(){ + try { + while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) { + System.out.println("线程池没有关闭"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void shutdown(){ + pool.shutdown(); + } + + @Deprecated + public static Long getThreadNumber(){ + String name = Thread.currentThread().getName(); + String[] split = name.split("-"); + return Long.parseLong(split[3]); + } + + + +} diff --git a/ip-learning-java-test/src/main/resources/application.properties b/ip-learning-java-test/src/main/resources/application.properties new file mode 100644 index 0000000..d6eb262 --- /dev/null +++ b/ip-learning-java-test/src/main/resources/application.properties @@ -0,0 +1,16 @@ +#arangoDB参数配置 +arangoDB.host=192.168.40.127 +arangoDB.port=8529 +arangoDB.user=root +arangoDB.password=111111 +arangoDB.DB.name=insert_iplearn_index +arangoDB.batch=100000 +arangoDB.ttl=3600 + +update.arango.batch=10000 + +thread.pool.number=5 +thread.await.termination.time=10 + +read.clickhouse.max.time=1571241660 +read.clickhouse.min.time=1571241600 \ No newline at end of file diff --git a/ip-learning-java-test/src/main/resources/clickhouse.properties b/ip-learning-java-test/src/main/resources/clickhouse.properties new file mode 100644 index 0000000..c5d89ac --- /dev/null +++ b/ip-learning-java-test/src/main/resources/clickhouse.properties @@ -0,0 +1,7 @@ +drivers=ru.yandex.clickhouse.ClickHouseDriver +db.id=192.168.40.193:8123/av_miner?socket_timeout=300000 +mdb.user=default +mdb.password=111111 +initialsize=1 +minidle=1 +maxactive=50