diff --git a/IP-learning-graph/.gitignore b/IP-learning-graph/.gitignore new file mode 100644 index 0000000..5db5dd3 --- /dev/null +++ b/IP-learning-graph/.gitignore @@ -0,0 +1,9 @@ +# Created by .ignore support plugin (hsz.mobi) +### Example user template template +### Example user template + +# IntelliJ project files +.idea +*.iml +target +logs/ diff --git a/IP-learning-graph/pom.xml b/IP-learning-graph/pom.xml new file mode 100644 index 0000000..f7e38af --- /dev/null +++ b/IP-learning-graph/pom.xml @@ -0,0 +1,87 @@ + + + 4.0.0 + + cn.ac.iie + IP-learning-graph + 1.0-SNAPSHOT + + + + org.slf4j + slf4j-api + 1.7.21 + + + + org.slf4j + slf4j-log4j12 + 1.7.21 + + + + ru.yandex.clickhouse + clickhouse-jdbc + 0.2.4 + + + + com.alibaba + druid + 1.1.10 + + + + com.typesafe + config + 1.2.1 + + + + com.arangodb + arangodb-java-driver + 5.0.4 + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.6 + + + + cn.ac.iie.test.IpLearningApplicationTest + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + + \ No newline at end of file diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java b/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java new file mode 100644 index 0000000..61f7826 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java @@ -0,0 +1,25 @@ +package cn.ac.iie.config; + + +import cn.ac.iie.utils.ConfigUtils; + +public class ApplicationConfig { + + public static final String ARANGODB_HOST = ConfigUtils.getStringProperty( "arangoDB.host"); + public static final Integer ARANGODB_PORT = ConfigUtils.getIntProperty("arangoDB.port"); + public static final String ARANGODB_USER = ConfigUtils.getStringProperty( "arangoDB.user"); + public static final String ARANGODB_PASSWORD = ConfigUtils.getStringProperty( "arangoDB.password"); + public static final String ARANGODB_DB_NAME = ConfigUtils.getStringProperty( "arangoDB.DB.name"); + public static final Integer ARANGODB_TTL = ConfigUtils.getIntProperty( "arangoDB.ttl"); + public static final Integer ARANGODB_BATCH = ConfigUtils.getIntProperty( "arangoDB.batch"); + + public static final Integer UPDATE_ARANGO_BATCH = ConfigUtils.getIntProperty("update.arango.batch"); + + public static final Integer THREAD_POOL_NUMBER = ConfigUtils.getIntProperty( "thread.pool.number"); + public static final Integer THREAD_AWAIT_TERMINATION_TIME = ConfigUtils.getIntProperty( "thread.await.termination.time"); + + public static final Long READ_CLICKHOUSE_MAX_TIME = ConfigUtils.getLongProperty("read.clickhouse.max.time"); + public static final Long READ_CLICKHOUSE_MIN_TIME = ConfigUtils.getLongProperty("read.clickhouse.min.time"); + + +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java new file mode 100644 index 0000000..fed69b2 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -0,0 +1,92 @@ +package cn.ac.iie.dao; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.etl.ArangoEFqdnAddressIpToMap; +import cn.ac.iie.etl.ArangoEIpVisitFqdnToMap; +import cn.ac.iie.etl.ArangoVFqdnToMap; +import cn.ac.iie.etl.ArangoVIpToMap; +import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ExecutorThreadPool; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; + +public class BaseArangoData { + private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); + + public static ConcurrentHashMap v_Fqdn_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap v_Ip_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>(); + public static ConcurrentHashMap e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>(); + + private static final ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); + + private static final ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); + + public static void BaseVFqdnDataMap() { + String sql = "LET FQDN = (FOR doc IN FQDN RETURN doc) return {max_time:MAX(FQDN[*].FIRST_FOUND_TIME),min_time:MIN(FQDN[*].FIRST_FOUND_TIME)}"; + long[] timeLimit = getTimeLimit(sql); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + ArangoVFqdnToMap ArangoVFqdnToMap = new ArangoVFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i); + threadPool.executor(ArangoVFqdnToMap); + } + } + + public static void BaseVIpDataMap() { + String sql = "LET IP = (FOR doc IN IP RETURN doc) return {max_time:MAX(IP[*].FIRST_FOUND_TIME),min_time:MIN(IP[*].FIRST_FOUND_TIME)}"; + long[] timeLimit = getTimeLimit(sql); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + ArangoVIpToMap arangoVIpToMap = new ArangoVIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i); + threadPool.executor(arangoVIpToMap); + } + } + + public static void BaseEFqdnAddressIpDataMap(){ + String sql = "LET e = (FOR doc IN R_LOCATE_FQDN2IP RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}"; + long[] timeLimit = getTimeLimit(sql); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ + ArangoEFqdnAddressIpToMap arangoEFqdnAddressIpToMap = new ArangoEFqdnAddressIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i); + threadPool.executor(arangoEFqdnAddressIpToMap); + } + } + + public static void BaseEIpVisitFqdnDataMap(){ + String sql = "LET e = (FOR doc IN R_VISIT_IP2FQDN RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}"; + long[] timeLimit = getTimeLimit(sql); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ + ArangoEIpVisitFqdnToMap arangoEIpVisitFqdnToMap = new ArangoEIpVisitFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i); + threadPool.executor(arangoEIpVisitFqdnToMap); + } + } + + private static long[] getTimeLimit(String sql) { + long minTime = 0L; + long maxTime = 0L; + long diffTime = 0L; + long startTime = System.currentTimeMillis(); + LOG.info(sql); + ArangoCursor timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class); + try { + if (timeDoc != null){ + while (timeDoc.hasNext()) { + BaseDocument doc = timeDoc.next(); + maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER; + minTime = Long.parseLong(doc.getAttribute("min_time").toString()); + } + long lastTime = System.currentTimeMillis(); + LOG.info("查询最大最小时间用时:" + (lastTime - startTime)); + diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; + }else { + LOG.warn("获取ArangoDb时间范围为空"); + } + }catch (Exception e){ + LOG.error(e.toString()); + } + return new long[]{minTime, maxTime, diffTime}; + } + +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java new file mode 100644 index 0000000..1c9eab3 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java @@ -0,0 +1,56 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ArangoEFqdnAddressIpToMap implements Runnable{ + private static final Logger LOG = LoggerFactory.getLogger(ArangoEFqdnAddressIpToMap.class); + + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + private ArangoEFqdnAddressIpToMap(){} + + public ArangoEFqdnAddressIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN R_LOCATE_FQDN2IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; + LOG.info(name + ":" + query); + long s = System.currentTimeMillis(); + try { + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseEdgeDocument doc : baseDocuments) { + String key = doc.getKey(); + BaseArangoData.e_Fqdn_Address_Ip_Map.put(key, doc); + i++; + } + LOG.info(name + ":共处理数据" + i); + long l = System.currentTimeMillis(); + LOG.info(name + "运行时间:" + (l - s)); + }else { + LOG.warn("查询R_LOCATE_FQDN2IP异常,结果为空"); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java new file mode 100644 index 0000000..ffd0069 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java @@ -0,0 +1,52 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ArangoEIpVisitFqdnToMap implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ArangoEIpVisitFqdnToMap.class); + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + private ArangoEIpVisitFqdnToMap(){} + + public ArangoEIpVisitFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN R_VISIT_IP2FQDN filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; + LOG.info(name + ":" + query); + long s = System.currentTimeMillis(); + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); + + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseEdgeDocument doc : baseDocuments) { + String key = doc.getKey(); + BaseArangoData.e_Ip_Visit_Fqdn_Map.put(key, doc); + i++; + } + LOG.info(name + ":共处理数据" + i); + long l = System.currentTimeMillis(); + LOG.info(name + "运行时间:" + (l - s)); + }else { + LOG.warn("查询R_VISIT_IP2FQDN异常,结果为空"); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java new file mode 100644 index 0000000..b7a58e2 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java @@ -0,0 +1,57 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ArangoVFqdnToMap implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ArangoVFqdnToMap.class); + + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + private ArangoVFqdnToMap(){} + + public ArangoVFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1)* diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN FQDN filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; + LOG.info(name+":"+query); + long s = System.currentTimeMillis(); + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); + + try { + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseDocument doc:baseDocuments){ + String key = doc.getKey(); + BaseArangoData.v_Fqdn_Map.put(key,doc); + i++; + } + LOG.info(name+":共处理数据"+ i); + long l = System.currentTimeMillis(); + LOG.info(name+"运行时间:"+(l-s)); + }else { + LOG.warn("获取VFqdn异常,结果为空"); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java new file mode 100644 index 0000000..51e311a --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java @@ -0,0 +1,57 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ArangoVIpToMap implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ArangoVIpToMap.class); + + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + private ArangoVIpToMap() {} + + public ArangoVIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; + LOG.info(name + ":" + query); + long s = System.currentTimeMillis(); + try { + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseDocument doc : baseDocuments) { + String key = doc.getKey(); + BaseArangoData.v_Ip_Map.put(key, doc); + i++; + } + LOG.info(name + ":共处理数据" + i); + long l = System.currentTimeMillis(); + LOG.info(name + "运行时间:" + (l - s)); + }else { + LOG.warn("获取VIP异常,结果为空"); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + } + +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java new file mode 100644 index 0000000..214f035 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java @@ -0,0 +1,113 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class UpdateEFqdnAddressIp implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(UpdateEFqdnAddressIp.class); + private HashMap documentHashMap; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateEFqdnAddressIp(HashMap documentHashMap) { + this.documentHashMap = documentHashMap; + } + + @Override + public void run() { + Set keySet = documentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (String key : keySet) { + BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); + if (newEdgeDocument != null) { + i += 1; + BaseEdgeDocument edgeDocument = BaseArangoData.e_Fqdn_Address_Ip_Map.getOrDefault(key, null); + + Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); + long tlsCountTotal = Long.parseLong(newEdgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); + long httpCountTotal = Long.parseLong(newEdgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); + + if (edgeDocument != null) { + long tlsUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); + long httpUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); + + edgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); + edgeDocument.addAttribute("TLS_CNT_TOTAL", tlsCountTotal + tlsUpdateCountTotal); + edgeDocument.addAttribute("HTTP_CNT_TOTAL", httpCountTotal + httpUpdateCountTotal); + + ArrayList tlsCntRecent = (ArrayList) edgeDocument.getAttribute("TLS_CNT_RECENT"); + Long[] tlsCntRecentsSrc = tlsCntRecent.toArray(new Long[tlsCntRecent.size()]); +// Long[] tlsCntRecentsSrc = (Long[]) edgeDocument.getAttribute("TLS_CNT_RECENT"); + Long[] tlsCntRecentsDst = new Long[7]; + System.arraycopy(tlsCntRecentsSrc, 0, tlsCntRecentsDst, 1, tlsCntRecentsSrc.length - 1); + tlsCntRecentsDst[0] = tlsCountTotal; + edgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); + + ArrayList httpCntRecent = (ArrayList) edgeDocument.getAttribute("HTTP_CNT_RECENT"); + Long[] httpCntRecentsSrc = httpCntRecent.toArray(new Long[httpCntRecent.size()]); +// Long[] httpCntRecentsSrc = (Long[]) edgeDocument.getAttribute("HTTP_CNT_RECENT"); + Long[] httpCntRecentsDst = new Long[7]; + System.arraycopy(httpCntRecentsSrc, 0, httpCntRecentsDst, 1, httpCntRecentsDst.length - 1); + httpCntRecentsDst[0] = httpCountTotal; + edgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); + + ArrayList distCipTotal = (ArrayList) edgeDocument.getAttribute("DIST_CIP_TOTAL"); + String[] distCipTotalsSrc = distCipTotal.toArray(new String[distCipTotal.size()]); +// String[] distCipTotalsSrc = (String[]) edgeDocument.getAttribute("DIST_CIP_TOTAL"); + +// ArrayList distCipRecent = (ArrayList) newEdgeDocument.getAttribute("DIST_CIP_RECENT"); +// String[] distCipRecentsSrc = distCipRecent.toArray(new String[distCipRecent.size()]); + String[] distCipRecentsSrc = (String[]) newEdgeDocument.getAttribute("DIST_CIP_RECENT"); + + if (distCipTotalsSrc.length == 30) { + HashSet dIpSet = new HashSet<>(); + dIpSet.addAll(Arrays.asList(distCipRecentsSrc)); + dIpSet.addAll(Arrays.asList(distCipTotalsSrc)); + Object[] distCipTotals = dIpSet.toArray(); + if (distCipTotals.length > 30) { + System.arraycopy(distCipTotals, 0, distCipTotals, 0, 30); + } + edgeDocument.addAttribute("DIST_CIP_TOTAL", distCipTotals); + } + edgeDocument.addAttribute("DIST_CIP_RECENT", distCipRecentsSrc); + +// docUpdate.add(edgeDocument); + docInsert.add(edgeDocument); + } else { + long[] tlsCntRecentsDst = new long[7]; + tlsCntRecentsDst[0] = tlsCountTotal; + newEdgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); + + long[] httpCntRecentsDst = new long[7]; + httpCntRecentsDst[0] = httpCountTotal; + newEdgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); + + docInsert.add(newEdgeDocument); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { +// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_LOCATE_FQDN2IP"); + arangoManger.overwrite(docInsert,"R_LOCATE_FQDN2IP"); + LOG.info("更新R_LOCATE_FQDN2IP:" + i); + i = 0; + } + } + } + if (i != 0) { +// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_LOCATE_FQDN2IP"); + arangoManger.overwrite(docInsert,"R_LOCATE_FQDN2IP"); + LOG.info("更新R_LOCATE_FQDN2IP:" + i); + } + } catch (Exception e) { + LOG.error(e.getMessage()); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java new file mode 100644 index 0000000..19ce6b7 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java @@ -0,0 +1,98 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Set; + +public class UpdateEIpVisitFqdn implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(UpdateEIpVisitFqdn.class); + private HashMap documentHashMap; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateEIpVisitFqdn(HashMap documentHashMap) { + this.documentHashMap = documentHashMap; + } + + @Override + public void run() { + Set keySet = documentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (String key : keySet) { + + + BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); + if (newEdgeDocument != null) { + i += 1; + BaseEdgeDocument edgeDocument = BaseArangoData.e_Ip_Visit_Fqdn_Map.getOrDefault(key, null); + + Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); + long tlsCountTotal = Long.parseLong(newEdgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); + long httpCountTotal = Long.parseLong(newEdgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); + + if (edgeDocument != null) { + long tlsUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); + long httpUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); + + edgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); + edgeDocument.addAttribute("TLS_CNT_TOTAL", tlsCountTotal + tlsUpdateCountTotal); + edgeDocument.addAttribute("HTTP_CNT_TOTAL", httpCountTotal + httpUpdateCountTotal); + + ArrayList tlsCntRecent = (ArrayList) edgeDocument.getAttribute("TLS_CNT_RECENT"); + Long[] tlsCntRecentsSrc = tlsCntRecent.toArray(new Long[tlsCntRecent.size()]); +// Long[] tlsCntRecentsSrc = (Long[]) edgeDocument.getAttribute("TLS_CNT_RECENT"); + Long[] tlsCntRecentsDst = new Long[7]; + System.arraycopy(tlsCntRecentsSrc, 0, tlsCntRecentsDst, 1, tlsCntRecentsSrc.length - 1); + tlsCntRecentsDst[0] = tlsCountTotal; + edgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); + + ArrayList httpCntRecent = (ArrayList) edgeDocument.getAttribute("HTTP_CNT_RECENT"); + Long[] httpCntRecentsSrc = httpCntRecent.toArray(new Long[httpCntRecent.size()]); +// Long[] httpCntRecentsSrc = (Long[]) edgeDocument.getAttribute("HTTP_CNT_RECENT"); + Long[] httpCntRecentsDst = new Long[7]; + System.arraycopy(httpCntRecentsSrc, 0, httpCntRecentsDst, 1, httpCntRecentsDst.length - 1); + httpCntRecentsDst[0] = httpCountTotal; + edgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); + +// docUpdate.add(edgeDocument); + docInsert.add(edgeDocument); + } else { + long[] tlsCntRecentsDst = new long[7]; + tlsCntRecentsDst[0] = tlsCountTotal; + newEdgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); + + long[] httpCntRecentsDst = new long[7]; + httpCntRecentsDst[0] = httpCountTotal; + newEdgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); + + docInsert.add(newEdgeDocument); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { +// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_VISIT_IP2FQDN"); + arangoManger.overwrite(docInsert,"R_VISIT_IP2FQDN"); + LOG.info("更新R_VISIT_IP2FQDN:" + i); + i = 0; + } + } + } + if (i != 0) { +// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_VISIT_IP2FQDN"); + arangoManger.overwrite(docInsert,"R_VISIT_IP2FQDN"); + LOG.info("更新R_VISIT_IP2FQDN:" + i); + } + } catch (Exception e) { + LOG.error(e.getMessage()); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java new file mode 100644 index 0000000..2754595 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java @@ -0,0 +1,62 @@ +package cn.ac.iie.etl; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; + +public class UpdateVFqdn implements Runnable{ + private static final Logger LOG = LoggerFactory.getLogger(UpdateVFqdn.class); + + private ArrayList documentList; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateVFqdn(ArrayList documentList) { + this.documentList = documentList; + } + + @Override + public void run() { + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (BaseDocument newDocument:documentList){ + String key = newDocument.getKey(); + if (!key.equals("")){ + i += 1; + BaseDocument document = BaseArangoData.v_Fqdn_Map.getOrDefault(key, null); + if (document != null){ + Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); + document.addAttribute("LAST_FOUND_TIME",lastFoundTime); +// docUpdate.add(document); + docInsert.add(document); + }else { + docInsert.add(newDocument); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ +// arangoManger.insertAndUpdate(docInsert,docUpdate,"FQDN"); + arangoManger.overwrite(docInsert,"FQDN"); + LOG.info("更新FQDN:"+i); + i = 0; + } + } + } + if (i != 0){ +// arangoManger.insertAndUpdate(docInsert,docUpdate,"FQDN"); + arangoManger.overwrite(docInsert,"FQDN"); + LOG.info("更新FQDN:"+i); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVIP.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVIP.java new file mode 100644 index 0000000..9a70fca --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVIP.java @@ -0,0 +1,60 @@ +package cn.ac.iie.etl; + + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; + +public class UpdateVIP implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(UpdateVIP.class); + + private ArrayList documentList; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateVIP(ArrayList documentList) { + this.documentList = documentList; + } + + @Override + public void run() { + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (BaseDocument newDocument:documentList){ + String key = newDocument.getKey(); + if (!key.equals("")){ + i += 1; + BaseDocument document = BaseArangoData.v_Ip_Map.getOrDefault(key, null); + if (document != null){ + Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); + document.addAttribute("LAST_FOUND_TIME",lastFoundTime); +// docUpdate.add(document); + docInsert.add(document); + }else { + docInsert.add(newDocument); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ +// arangoManger.insertAndUpdate(docInsert,docUpdate,"IP"); + arangoManger.overwrite(docInsert,"IP"); + LOG.info("更新IP:"+i); + i = 0; + } + } + } + if (i != 0){ +// arangoManger.insertAndUpdate(docInsert,docUpdate,"IP"); + arangoManger.overwrite(docInsert,"IP"); + LOG.info("更新IP:"+i); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java new file mode 100644 index 0000000..187cc7f --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java @@ -0,0 +1,76 @@ +package cn.ac.iie.test; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.dao.BaseClickhouseData; +import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ExecutorThreadPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; + +public class IpLearningApplicationTest { + private static final Logger LOG = LoggerFactory.getLogger(IpLearningApplicationTest.class); + + public static void main(String[] args) { + + long startA = System.currentTimeMillis(); + BaseArangoData.BaseVFqdnDataMap(); + BaseArangoData.BaseVIpDataMap(); + BaseArangoData.BaseEFqdnAddressIpDataMap(); + BaseArangoData.BaseEIpVisitFqdnDataMap(); + + + ExecutorThreadPool.shutdown(); + ExecutorThreadPool.awaitThreadTask(); + long lastA = System.currentTimeMillis(); + LOG.info("读取ArangoDb时间:"+(lastA - startA)); + + long startC = System.currentTimeMillis(); + + try { + CountDownLatch countDownLatch = new CountDownLatch(4); + + new Thread(() -> { + BaseClickhouseData.BaseVFqdn(); + countDownLatch.countDown(); + }).start(); + + new Thread(() -> { + BaseClickhouseData.BaseVIp(); + countDownLatch.countDown(); + }).start(); + + new Thread(() -> { + BaseClickhouseData.BaseEFqdnAddressIp(); + countDownLatch.countDown(); + }).start(); + + new Thread(() -> { + BaseClickhouseData.BaseEIpVisitFqdn(); + countDownLatch.countDown(); + }).start(); + + try { + countDownLatch.await(); + LOG.info("主线程等待完毕"); + }catch (Exception e){ + LOG.error("主线程阻塞异常:\n"+e.toString()); + } + + // BaseClickhouseData.BaseEFqdnAddressIp(); + long lastC = System.currentTimeMillis(); + LOG.info("更新ArangoDb时间:"+(lastC - startC)); + }catch (Exception e){ + e.printStackTrace(); + }finally { + ArangoDBConnect.clean(); + } + + LOG.info("v_Fqdn_Map大小:"+BaseArangoData.v_Fqdn_Map.size()); + LOG.info("v_Ip_Map大小:"+BaseArangoData.v_Ip_Map.size()); + LOG.info("e_Fqdn_Address_Ip_Map大小:"+BaseArangoData.e_Fqdn_Address_Ip_Map.size()); + LOG.info("e_Ip_Visit_Fqdn_Map大小:"+BaseArangoData.e_Ip_Visit_Fqdn_Map.size()); + + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java b/IP-learning-graph/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java new file mode 100644 index 0000000..b913e6b --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/test/ReadArangoDBThreadTest.java @@ -0,0 +1,95 @@ +package cn.ac.iie.test; + +import cn.ac.iie.config.ApplicationConfig; +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoDB; +import com.arangodb.ArangoDatabase; +import com.arangodb.entity.BaseDocument; +import com.arangodb.model.AqlQueryOptions; +import com.arangodb.util.MapBuilder; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class ReadArangoDBThreadTest { + + private static ConcurrentHashMap fqdnMap = new ConcurrentHashMap(); + public static void main(String[] args) throws Exception { + final ArangoDB arangoDB = new ArangoDB.Builder() + .maxConnections(ApplicationConfig.THREAD_POOL_NUMBER) + .host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT) + .user(ApplicationConfig.ARANGODB_USER) + .password(ApplicationConfig.ARANGODB_PASSWORD) + .build(); + Map bindVars = new MapBuilder().get(); + AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); + + String sql = "LET FQDN = (FOR doc IN V_FQDN RETURN doc) return {max_time:MAX(FQDN[*].FQDN_FIRST_FOUND_TIME),min_time:MIN(FQDN[*].FQDN_FIRST_FOUND_TIME)}"; +// String sql = "LET IP = (FOR doc IN V_IP RETURN doc) return {max_time:MAX(IP[*].FIRST_FOUND_TIME),min_time:MIN(IP[*].FIRST_FOUND_TIME)}"; + final ArangoDatabase db = arangoDB.db("insert_iplearn_index"); + long startTime = System.currentTimeMillis(); + ArangoCursor timeDoc = db.query(sql, bindVars, options, BaseDocument.class); + long maxTime =0L; + long minTime =0L; + while (timeDoc.hasNext()){ + BaseDocument doc = timeDoc.next(); + maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER; + minTime = Long.parseLong(doc.getAttribute("min_time").toString()); + } + long lastTime = System.currentTimeMillis(); + System.out.println("查询最大最小时间用时:"+(lastTime-startTime)); + System.out.println(maxTime + "--" + minTime); + final long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; + ExecutorService pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER); + + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { +// long finalMaxTime = maxTime; + final long finalMinTime = minTime; + pool.execute(new Runnable() { + + public void run() { + String name = Thread.currentThread().getName(); + ArangoDatabase insert_iplearn_index = arangoDB.db("insert_iplearn_index"); + Map bindVars = new MapBuilder().get(); + AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); + String[] split = name.split("-"); + Long threadNum = Long.parseLong(split[3]); + long maxThreadTime = finalMinTime + threadNum * diffTime; + long minThreadTime = finalMinTime + (threadNum-1)*diffTime; + String query = "FOR doc IN V_FQDN filter doc.FQDN_FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FQDN_FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; +// String query = "FOR doc IN V_IP filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; + System.out.println(name+":"+query); + long s = System.currentTimeMillis(); + ArangoCursor fqdnDoc = insert_iplearn_index.query(query, bindVars, options, BaseDocument.class); + List baseDocuments = fqdnDoc.asListRemaining(); + int i = 0; + for (BaseDocument doc:baseDocuments){ + String key = doc.getKey(); +// System.out.println(name+":"+key); + fqdnMap.put(key,doc); + i++; + } + /* + while (fqdnDoc.hasNext()){ + BaseDocument doc = fqdnDoc.next(); + } + */ + System.out.println(name+":"+ i); + long l = System.currentTimeMillis(); + System.out.println(name+"运行时间:"+(l-s)); + } + }); + } + pool.shutdown(); + while (!pool.awaitTermination(20, TimeUnit.SECONDS)){ + + } + System.out.println(fqdnMap.size()); + arangoDB.shutdown(); + + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java new file mode 100644 index 0000000..1d74aca --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -0,0 +1,116 @@ +package cn.ac.iie.utils; + +import cn.ac.iie.config.ApplicationConfig; +import com.arangodb.ArangoCollection; +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoDB; +import com.arangodb.ArangoDatabase; +import com.arangodb.entity.DocumentCreateEntity; +import com.arangodb.entity.ErrorEntity; +import com.arangodb.entity.MultiDocumentEntity; +import com.arangodb.model.AqlQueryOptions; +import com.arangodb.model.DocumentCreateOptions; +import com.arangodb.util.MapBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +public class ArangoDBConnect { + private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class); + private static ArangoDB arangoDB = null; + private static ArangoDBConnect conn = null; + static { + getArangoDB(); + } + + private static void getArangoDB(){ + arangoDB = new ArangoDB.Builder() + .maxConnections(ApplicationConfig.THREAD_POOL_NUMBER) + .host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT) + .user(ApplicationConfig.ARANGODB_USER) + .password(ApplicationConfig.ARANGODB_PASSWORD) + .build(); + } + + public static synchronized ArangoDBConnect getInstance(){ + if (null == conn){ + conn = new ArangoDBConnect(); + } + return conn; + } + + public ArangoDatabase getDatabase(){ + return arangoDB.db(ApplicationConfig.ARANGODB_DB_NAME); + } + + public static void clean(){ + try { + if (arangoDB != null){ + arangoDB.shutdown(); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + } + + public ArangoCursor executorQuery(String query,Class type){ + ArangoDatabase database = getDatabase(); + Map bindVars = new MapBuilder().get(); + AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); + try { + return database.query(query, bindVars, options, type); + }catch (Exception e){ + LOG.error(e.getMessage()); + return null; + }finally { + bindVars.clear(); + } + } + + public void insertAndUpdate(ArrayList docInsert,ArrayList docUpdate,String collectionName){ + ArangoDatabase database = getDatabase(); + try { + ArangoCollection collection = database.collection(collectionName); + if (!docInsert.isEmpty()){ + DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions(); +// documentCreateOptions.overwrite(true); + collection.insertDocuments(docInsert, documentCreateOptions); + } + if (!docUpdate.isEmpty()){ + collection.replaceDocuments(docUpdate); + } + }catch (Exception e){ + LOG.error("更新失败\n"+e.toString()); + }finally { + docInsert.clear(); + docInsert.clear(); + } + } + + public void overwrite(ArrayList docOverwrite,String collectionName){ + ArangoDatabase database = getDatabase(); + try { + ArangoCollection collection = database.collection(collectionName); + if (!docOverwrite.isEmpty()){ + DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions(); + documentCreateOptions.overwrite(true); + documentCreateOptions.returnNew(true); + documentCreateOptions.returnOld(true); + MultiDocumentEntity> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions); + Collection errors = documentCreateEntityMultiDocumentEntity.getErrors(); + for (ErrorEntity errorEntity:errors){ + LOG.error("写入arangoDB异常:"+errorEntity.getErrorMessage()); + } + + } + }catch (Exception e){ + LOG.error(e.toString()); + }finally { + docOverwrite.clear(); + } + } + +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java new file mode 100644 index 0000000..f436192 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java @@ -0,0 +1,109 @@ +package cn.ac.iie.utils; + +import com.alibaba.druid.pool.DruidDataSource; +import com.alibaba.druid.pool.DruidPooledConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; + +public class ClickhouseConnect { + private static final Logger LOG = LoggerFactory.getLogger(ClickhouseConnect.class); + private static DruidDataSource dataSource = null; + private static ClickhouseConnect dbConnect = null; + private static Properties props = new Properties(); + + static { + getDbConnect(); + } + + private static void getDbConnect() { + try { + if (dataSource == null) { + dataSource = new DruidDataSource(); + props.load(ClickhouseConnect.class.getClassLoader().getResourceAsStream("clickhouse.properties")); + //设置连接参数 + dataSource.setUrl("jdbc:clickhouse://" + props.getProperty("db.id")); + dataSource.setDriverClassName(props.getProperty("drivers")); + dataSource.setUsername(props.getProperty("mdb.user")); + dataSource.setPassword(props.getProperty("mdb.password")); + //配置初始化大小、最小、最大 + dataSource.setInitialSize(Integer.parseInt(props.getProperty("initialsize"))); + dataSource.setMinIdle(Integer.parseInt(props.getProperty("minidle"))); + dataSource.setMaxActive(Integer.parseInt(props.getProperty("maxactive"))); + //配置获取连接等待超时的时间 + dataSource.setMaxWait(30000); + //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + dataSource.setTimeBetweenEvictionRunsMillis(2000); + //防止过期 + dataSource.setValidationQuery("SELECT 1"); + dataSource.setTestWhileIdle(true); + dataSource.setTestOnBorrow(true); + dataSource.setKeepAlive(true); + } + } catch (Exception e) { + LOG.error(e.getMessage()); + + } + } + + /** + * 数据库连接池单例 + * + * @return dbConnect + */ + public static synchronized ClickhouseConnect getInstance() { + if (null == dbConnect) { + dbConnect = new ClickhouseConnect(); + } + return dbConnect; + } + + /** + * 返回druid数据库连接 + * + * @return 连接 + * @throws SQLException sql异常 + */ + public DruidPooledConnection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + /** + * 清空PreparedStatement、Connection对象,未定义的置空。 + * + * @param pstmt PreparedStatement对象 + * @param connection Connection对象 + */ + public void clear(Statement pstmt, Connection connection) { + try { + if (pstmt != null) { + pstmt.close(); + } + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + LOG.error(e.getMessage()); + } + + } + + public ResultSet executorQuery(String query,Connection connection,Statement pstm){ +// Connection connection = null; +// Statement pstm = null; + try { + connection = getConnection(); + pstm = connection.createStatement(); + return pstm.executeQuery(query); + }catch (Exception e){ + LOG.error(e.getMessage()); + return null; + } + } + +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ConfigUtils.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ConfigUtils.java new file mode 100644 index 0000000..808ac84 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ConfigUtils.java @@ -0,0 +1,40 @@ +package cn.ac.iie.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class ConfigUtils { + private static final Logger LOG = LoggerFactory.getLogger(ConfigUtils.class); + private static Properties propCommon = new Properties(); + + public static String getStringProperty(String key) { + return propCommon.getProperty(key); + } + + + public static Integer getIntProperty(String key) { + return Integer.parseInt(propCommon.getProperty(key)); + } + + public static Long getLongProperty(String key) { + return Long.parseLong(propCommon.getProperty(key)); + } + + public static Boolean getBooleanProperty(String key) { + return "true".equals(propCommon.getProperty(key).toLowerCase().trim()); + } + + static { + try { + propCommon.load(ConfigUtils.class.getClassLoader().getResourceAsStream("application.properties")); + LOG.info("application.properties加载成功"); + + + } catch (Exception e) { + propCommon = null; + LOG.error("配置加载失败"); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java new file mode 100644 index 0000000..99ff051 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java @@ -0,0 +1,58 @@ +package cn.ac.iie.utils; + +import cn.ac.iie.config.ApplicationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class ExecutorThreadPool { + private static final Logger LOG = LoggerFactory.getLogger(ExecutorThreadPool.class); + private static ExecutorService pool = null ; + private static ExecutorThreadPool poolExecutor = null; + + static { + getThreadPool(); + } + + private static void getThreadPool(){ + pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER); + } + + public static ExecutorThreadPool getInstance(){ + if (null == poolExecutor){ + poolExecutor = new ExecutorThreadPool(); + } + return poolExecutor; + } + + public void executor(Runnable command){ + pool.execute(command); + } + + public static void awaitThreadTask(){ + try { + while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) { + LOG.warn("线程池没有关闭"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void shutdown(){ + pool.shutdown(); + } + + @Deprecated + public static Long getThreadNumber(){ + String name = Thread.currentThread().getName(); + String[] split = name.split("-"); + return Long.parseLong(split[3]); + } + + + +} diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties new file mode 100644 index 0000000..bc1d1bd --- /dev/null +++ b/IP-learning-graph/src/main/resources/application.properties @@ -0,0 +1,17 @@ +#arangoDB参数配置 +arangoDB.host=192.168.40.182 +arangoDB.port=8529 +arangoDB.user=root +arangoDB.password=111111 +#arangoDB.DB.name=ip-learning-test +arangoDB.DB.name=ip-learning-test +arangoDB.batch=100000 +arangoDB.ttl=3600 + +update.arango.batch=10000 + +thread.pool.number=10 +thread.await.termination.time=10 + +read.clickhouse.max.time=1593162456 +read.clickhouse.min.time=1592879247 \ No newline at end of file diff --git a/IP-learning-graph/src/main/resources/clickhouse.properties b/IP-learning-graph/src/main/resources/clickhouse.properties new file mode 100644 index 0000000..1a84107 --- /dev/null +++ b/IP-learning-graph/src/main/resources/clickhouse.properties @@ -0,0 +1,9 @@ +drivers=ru.yandex.clickhouse.ClickHouseDriver +#db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000 +db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000 +mdb.user=default +#mdb.password=ceiec2019 +mdb.password=111111 +initialsize=1 +minidle=1 +maxactive=50 diff --git a/IP-learning-graph/src/main/resources/log4j.properties b/IP-learning-graph/src/main/resources/log4j.properties new file mode 100644 index 0000000..21cea3d --- /dev/null +++ b/IP-learning-graph/src/main/resources/log4j.properties @@ -0,0 +1,24 @@ +######################### logger ############################## +log4j.logger.org.apache.http=OFF +log4j.logger.org.apache.http.wire=OFF + +#Log4j +log4j.rootLogger=info,console,file +# ̨־ +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# ļ־ +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#··زӦĿ +#log4j.appender.file.file=/home/ceiec/iplearning/logs/ip-learning-application.log +log4j.appender.file.file=./logs/ip-learning-application.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/ArrayTest.java b/IP-learning-graph/src/test/java/cn/ac/iie/ArrayTest.java new file mode 100644 index 0000000..11d0b6d --- /dev/null +++ b/IP-learning-graph/src/test/java/cn/ac/iie/ArrayTest.java @@ -0,0 +1,13 @@ +package cn.ac.iie; + +import java.util.Arrays; + +public class ArrayTest { + public static void main(String[] args) { + long[] longs = {1, 2, 3, 4, 5, 6, 7}; + long[] longs1 = new long[7]; + System.arraycopy(longs, 0, longs1, 1, longs.length - 1); + longs1[0] = 8; + System.out.println(Arrays.toString(longs1)); + } +} diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/IpTest.java b/IP-learning-graph/src/test/java/cn/ac/iie/IpTest.java new file mode 100644 index 0000000..1816e97 --- /dev/null +++ b/IP-learning-graph/src/test/java/cn/ac/iie/IpTest.java @@ -0,0 +1,50 @@ +package cn.ac.iie; + +import cn.ac.iie.config.ApplicationConfig; + +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.regex.Pattern; + +public class IpTest { + public static void main(String[] args) throws UnknownHostException { + /* + String ipStr = "192.168.40.152"; + Pattern pattern = Pattern.compile("^[\\d]*$"); + String[] split = ipStr.split("\\."); + for (String s:split){ + System.out.println(s); + System.out.println(pattern.matcher(s).matches()); + } +*/ +// String ip = "17.57.145.7"; + String ip = "pixel.rubiconproject.com"; +// String ip = "113.200.17.239"; + System.out.println(ip.hashCode()); + int hash = Math.abs(ip.hashCode()); + int i = hash % ApplicationConfig.THREAD_POOL_NUMBER; + System.out.println(i); +// String[] ipArr = ipStr.split("\\."); +// long ipLong = (Long.valueOf(ipArr[0]) << 24) + (Long.valueOf(ipArr[1]) << 16) + (Long.valueOf(ipArr[2]) << 8) + (Long.valueOf(ipArr[3])); +// System.out.println(ipLong); +// +// +// StringBuffer ipBf = new StringBuffer(); +// ipBf.append(ipLong >>> 24).append("."); +// ipBf.append((ipLong >>> 16) & 0xFF).append("."); +// ipBf.append((ipLong >>> 8) & 0xFF).append("."); +// ipBf.append(ipLong & 0xFF); +// String ip = ipBf.toString(); +// System.out.println(ip); +// +// System.out.println("---------------"); +// InetAddress byName = InetAddress.getByName("2001:470:19:790::38"); +// byte[] address = byName.getAddress(); +// for (byte b : address) { +// System.out.println(b & 0xFF); +// } + + + } +} diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java new file mode 100644 index 0000000..4429909 --- /dev/null +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java @@ -0,0 +1,81 @@ +package cn.ac.iie; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.etl.UpdateEFqdnAddressIp; +import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ClickhouseConnect; +import com.alibaba.druid.pool.DruidPooledConnection; +import com.arangodb.ArangoCollection; +import com.arangodb.ArangoDatabase; +import com.arangodb.entity.*; + +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.*; + +public class TestMap { + + public static void main(String[] args) { + /* + long maxTime = 1592794800; + long minTime = 1590112800; + String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime+ " AND (common_schema_type = 'HTTP' or common_schema_type = 'SSL')"; + String sql = "SELECT common_schema_type,http_host,ssl_sni,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL,groupArray(30)(common_server_ip) as DIST_CIP_RECENT FROM tsg_galaxy_v3.connection_record_log WHERE "+where+" GROUP BY common_schema_type,http_host,ssl_sni"; + System.out.println(sql); + long start = System.currentTimeMillis(); + try { + DruidPooledConnection connection = ClickhouseConnect.getInstance().getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + HashMap> schemaHashMap = new HashMap<>(); + while (resultSet.next()) { + String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); + for (String s:distCipRecents){ + System.out.print(s+","); + } + System.out.println(); + } + + } catch (Exception e) { + e.printStackTrace(); + } + */ +// long[] longs = new long[]{1,2,3,4,5,6,7}; + /* + long[] longs = new long[]{1,2,3,4}; + long[] longs1 = new long[7]; + System.arraycopy(longs,0,longs1,1,longs.length-1); + longs1[0] = 0; + for (long c:longs1){ + System.out.println(c); + } + */ + + + String[] distCipRecents = new String[]{"2.3"}; + ArrayList baseEdgeDocuments = new ArrayList<>(); + + BaseDocument newDoc = new BaseDocument(); + newDoc.setKey("111$#$"); +// newDoc.setKey("11111"); + newDoc.addAttribute("FIRST_FOUND_TIME", 123); + newDoc.addAttribute("LAST_FOUND_TIME", 123); + + BaseDocument document = new BaseDocument(); + document.setKey("4399pk.com2142379111"); + document.addAttribute("FIRST_FOUND_TIME",1592743297); + document.addAttribute("LAST_FOUND_TIME",1592743297); + + baseEdgeDocuments.add(newDoc); + baseEdgeDocuments.add(document); + + ArangoDBConnect instance = ArangoDBConnect.getInstance(); + + instance.overwrite(baseEdgeDocuments,"FQDN"); + + ArangoDBConnect.clean(); + + + + } +}