From db8e764e00780ba0ee5f67c9c33e7b36c4d4509f Mon Sep 17 00:00:00 2001
From: wanglihui <949764788@qq.com>
Date: Fri, 21 Aug 2020 18:08:58 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=85=A8=E5=B1=80=E5=8F=98?=
=?UTF-8?q?=E9=87=8F=E4=B8=BA=E6=9C=AC=E5=9C=B0=E5=8F=98=E9=87=8F?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
IP-learning-graph/pom.xml | 6 ++
.../java/cn/ac/iie/dao/BaseArangoData.java | 26 +++++++
.../cn/ac/iie/dao/BaseClickhouseData.java | 46 ++++++++++++
.../java/cn/ac/iie/dao/UpdateGraphData.java | 75 +++++++++++++++++++
.../iie/test/IpLearningApplicationTest.java | 3 +-
.../test/java/cn/ac/iie/TestReadArango.java | 53 +++++++++++++
.../java/cn/ac/iie/TestReadClickhouse.java | 46 ++++++++++++
7 files changed, 254 insertions(+), 1 deletion(-)
create mode 100644 IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java
create mode 100644 IP-learning-graph/src/test/java/cn/ac/iie/TestReadClickhouse.java
diff --git a/IP-learning-graph/pom.xml b/IP-learning-graph/pom.xml
index dd265ad..d0594b2 100644
--- a/IP-learning-graph/pom.xml
+++ b/IP-learning-graph/pom.xml
@@ -39,6 +39,12 @@
1.2.1
+
+ junit
+ junit
+ 4.12
+
+
com.arangodb
arangodb-java-driver
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java
index d28519f..4375d3a 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java
@@ -58,6 +58,32 @@ public class BaseArangoData {
}
}
+ public ConcurrentHashMap> readHistoryData(String table, Class type){
+ ConcurrentHashMap> map = new ConcurrentHashMap<>();
+ try {
+ LOG.info("开始更新"+table);
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){
+ map.put(i,new ConcurrentHashMap<>());
+ }
+ CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER);
+ Long countTotal = getCountTotal(table);
+ for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
+ String sql = getQuerySql(countTotal, i, table);
+ ReadHistoryArangoData readHistoryArangoData =
+ new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch);
+ threadPool.executor(readHistoryArangoData);
+ }
+ countDownLatch.await();
+ long last = System.currentTimeMillis();
+ LOG.info("读取"+table+" arangoDB 共耗时:"+(last-start));
+ }catch (Exception e){
+ e.printStackTrace();
+ LOG.error("读取历史数据失败 "+e.toString());
+ }
+ return map;
+ }
+
private long[] getTimeRange(String table){
long minTime = 0L;
long maxTime = 0L;
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
index 7c89a18..d0a55ed 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
@@ -73,5 +73,51 @@ public class BaseClickhouseData {
}
}
+ public HashMap>> baseDocFromCk(Supplier getSqlSupplier,
+ Function formatResultFunc){
+ long start = System.currentTimeMillis();
+ HashMap>> newDataMap = initializeMap();
+ if (newDataMap == null){
+ return null;
+ }
+ String sql = getSqlSupplier.get();
+ try {
+ connection = manger.getConnection();
+ statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ int i = 0;
+ while (resultSet.next()) {
+ T newDoc = formatResultFunc.apply(resultSet);
+ if (newDoc != null) {
+ i+=1;
+ putMapByHashcode(newDoc, newDataMap);
+ }
+ }
+ long last = System.currentTimeMillis();
+ LOG.info(sql + "\n读取"+i+"条数据,运行时间:" + (last - start));
+ }catch (Exception e){
+ e.printStackTrace();
+ LOG.error("获取原始数据失败 "+e.toString());
+ }finally {
+ manger.clear(statement,connection);
+ }
+ return newDataMap;
+ }
+
+ private HashMap>> initializeMap(){
+ try {
+ HashMap>> newDataMap = new HashMap<>();
+ for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
+ newDataMap.put(i, new HashMap<>());
+ }
+ return newDataMap;
+ }catch (Exception e){
+ e.printStackTrace();
+ LOG.error("数据初始化失败 "+e.toString());
+ return null;
+ }
+
+ }
+
}
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
index 12fc1bd..1572db1 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
@@ -75,6 +75,41 @@ public class UpdateGraphData {
}
}
+ public void updateArango2(){
+ long start = System.currentTimeMillis();
+ try {
+
+ updateDocument("FQDN", Fqdn.class,BaseDocument.class,
+ ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
+
+ updateDocument("IP", Ip.class,BaseDocument.class,
+ ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
+
+ updateDocument("SUBSCRIBER", Subscriber.class,BaseDocument.class,
+ ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument);
+
+ updateDocument("R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class,
+ ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument);
+
+// updateDocument("R_VISIT_IP2FQDN",
+// VisitIp2Fqdn.class,BaseEdgeDocument.class,
+// ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument);
+
+ updateDocument("R_LOCATE_SUBSCRIBER2IP",
+ LocateSubscriber2Ip.class,BaseEdgeDocument.class,
+ ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument);
+
+
+ long last = System.currentTimeMillis();
+ LOG.info("iplearning application运行完毕,用时:"+(last - start));
+ }catch (Exception e){
+ e.printStackTrace();
+ }finally {
+ arangoManger.clean();
+ pool.shutdown();
+ }
+ }
+
private void updateDocument(HashMap>> newMap,
ConcurrentHashMap> historyMap,
@@ -116,5 +151,45 @@ public class UpdateGraphData {
}
}
+ private void updateDocument(String collection,
+ Class extends Document> taskType,
+ Class docmentType,
+ Supplier getSqlSupplier,
+ Function formatResultFunc){
+ ConcurrentHashMap> historyData = baseArangoData.readHistoryData(collection, docmentType);
+ LOG.info(collection+" 读取clickhouse,封装结果集");
+ HashMap>> newData = baseClickhouseData.baseDocFromCk(getSqlSupplier, formatResultFunc);
+ try {
+ LOG.info(collection+" 开始更新");
+ long start = System.currentTimeMillis();
+ CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER);
+ for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){
+ HashMap> tmpNewMap = newData.get(i);
+ ConcurrentHashMap tmpHisMap = historyData.get(i);
+ Constructor constructor = taskType.getConstructor(
+ HashMap.class,
+ ArangoDBConnect.class,
+ String.class,
+ ConcurrentHashMap.class,
+ CountDownLatch.class);
+ Document docTask = (Document)constructor.newInstance(tmpNewMap, arangoManger, collection, tmpHisMap, countDownLatch);
+ pool.executor(docTask);
+ }
+ countDownLatch.await();
+ long last = System.currentTimeMillis();
+ LOG.info(collection+" 更新完毕,共耗时:"+(last-start));
+
+ }catch (Exception e){
+ e.printStackTrace();
+ LOG.error("更新"+collection+"失败!!"+e.toString());
+ }finally {
+ newData.clear();
+ historyData.clear();
+ }
+
+
+ }
+
+
}
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java
index 1165eee..a13e44c 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java
@@ -6,7 +6,8 @@ public class IpLearningApplicationTest {
public static void main(String[] args) {
UpdateGraphData updateGraphData = new UpdateGraphData();
- updateGraphData.updateArango();
+// updateGraphData.updateArango();
+ updateGraphData.updateArango2();
}
}
diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java
new file mode 100644
index 0000000..cffc50f
--- /dev/null
+++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java
@@ -0,0 +1,53 @@
+package cn.ac.iie;
+
+import cn.ac.iie.dao.BaseArangoData;
+import cn.ac.iie.utils.ArangoDBConnect;
+import cn.ac.iie.utils.ExecutorThreadPool;
+import com.arangodb.entity.BaseDocument;
+import com.arangodb.entity.BaseEdgeDocument;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.Enumeration;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TestReadArango {
+ private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance();
+ private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
+
+ private static BaseArangoData baseArangoData = new BaseArangoData();
+
+
+ @Test
+ public void testReadFqdnFromArango() {
+ ConcurrentHashMap> historyData =
+ baseArangoData.readHistoryData("FQDN", BaseDocument.class);
+ printMap(historyData);
+ }
+
+ @Test
+ public void testReadFqdnLocIpFromArango() {
+ ConcurrentHashMap> ip =
+ baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", BaseEdgeDocument.class);
+ printMap(ip);
+ }
+
+ private void printMap(ConcurrentHashMap> historyData) {
+ ConcurrentHashMap map = historyData.get(2);
+ Enumeration keys = map.keys();
+ while (keys.hasMoreElements()) {
+ String key = keys.nextElement();
+ T document = map.get(key);
+ System.out.println(document.toString());
+ }
+ }
+
+
+ @After
+ public void clearSource() {
+ pool.shutdown();
+ arangoManger.clean();
+ }
+
+
+}
diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestReadClickhouse.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadClickhouse.java
new file mode 100644
index 0000000..ef0fe1e
--- /dev/null
+++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadClickhouse.java
@@ -0,0 +1,46 @@
+package cn.ac.iie;
+
+import cn.ac.iie.dao.BaseClickhouseData;
+import cn.ac.iie.service.ingestion.ReadClickhouseData;
+import com.arangodb.entity.BaseDocument;
+import com.arangodb.entity.BaseEdgeDocument;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Set;
+
+public class TestReadClickhouse {
+
+ private static BaseClickhouseData baseClickhouseData = new BaseClickhouseData();
+
+ @Test
+ public void testReadFqdnFromCk(){
+
+
+ HashMap>> newData =
+ baseClickhouseData.baseDocFromCk(ReadClickhouseData::getVertexFqdnSql,
+ ReadClickhouseData::getVertexFqdnDocument);
+ printMap(newData);
+
+ }
+
+ @Test
+ public void testReadFqdnLocIpFromCk(){
+ HashMap>> map =
+ baseClickhouseData.baseDocFromCk(ReadClickhouseData::getRelationshipFqdnAddressIpSql,
+ ReadClickhouseData::getRelationFqdnAddressIpDocument);
+
+ printMap(map);
+ }
+
+
+ private void printMap(HashMap>> newData){
+ HashMap> map = newData.get(1);
+ Set strings = map.keySet();
+ for (String key:strings){
+ ArrayList baseDocuments = map.get(key);
+ System.out.println(baseDocuments.get(0));
+ }
+ }
+}