diff --git a/IP-learning-graph/pom.xml b/IP-learning-graph/pom.xml index dd265ad..d0594b2 100644 --- a/IP-learning-graph/pom.xml +++ b/IP-learning-graph/pom.xml @@ -39,6 +39,12 @@ 1.2.1 + + junit + junit + 4.12 + + com.arangodb arangodb-java-driver diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index d28519f..4375d3a 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -58,6 +58,32 @@ public class BaseArangoData { } } + public ConcurrentHashMap> readHistoryData(String table, Class type){ + ConcurrentHashMap> map = new ConcurrentHashMap<>(); + try { + LOG.info("开始更新"+table); + long start = System.currentTimeMillis(); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ + map.put(i,new ConcurrentHashMap<>()); + } + CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + Long countTotal = getCountTotal(table); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + String sql = getQuerySql(countTotal, i, table); + ReadHistoryArangoData readHistoryArangoData = + new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch); + threadPool.executor(readHistoryArangoData); + } + countDownLatch.await(); + long last = System.currentTimeMillis(); + LOG.info("读取"+table+" arangoDB 共耗时:"+(last-start)); + }catch (Exception e){ + e.printStackTrace(); + LOG.error("读取历史数据失败 "+e.toString()); + } + return map; + } + private long[] getTimeRange(String table){ long minTime = 0L; long maxTime = 0L; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index 7c89a18..d0a55ed 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -73,5 +73,51 @@ public class BaseClickhouseData { } } + public HashMap>> baseDocFromCk(Supplier getSqlSupplier, + Function formatResultFunc){ + long start = System.currentTimeMillis(); + HashMap>> newDataMap = initializeMap(); + if (newDataMap == null){ + return null; + } + String sql = getSqlSupplier.get(); + try { + connection = manger.getConnection(); + statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + int i = 0; + while (resultSet.next()) { + T newDoc = formatResultFunc.apply(resultSet); + if (newDoc != null) { + i+=1; + putMapByHashcode(newDoc, newDataMap); + } + } + long last = System.currentTimeMillis(); + LOG.info(sql + "\n读取"+i+"条数据,运行时间:" + (last - start)); + }catch (Exception e){ + e.printStackTrace(); + LOG.error("获取原始数据失败 "+e.toString()); + }finally { + manger.clear(statement,connection); + } + return newDataMap; + } + + private HashMap>> initializeMap(){ + try { + HashMap>> newDataMap = new HashMap<>(); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + newDataMap.put(i, new HashMap<>()); + } + return newDataMap; + }catch (Exception e){ + e.printStackTrace(); + LOG.error("数据初始化失败 "+e.toString()); + return null; + } + + } + } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index 12fc1bd..1572db1 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -75,6 +75,41 @@ public class UpdateGraphData { } } + public void updateArango2(){ + long start = System.currentTimeMillis(); + try { + + updateDocument("FQDN", Fqdn.class,BaseDocument.class, + ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); + + updateDocument("IP", Ip.class,BaseDocument.class, + ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument); + + updateDocument("SUBSCRIBER", Subscriber.class,BaseDocument.class, + ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument); + + updateDocument("R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class, + ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument); + +// updateDocument("R_VISIT_IP2FQDN", +// VisitIp2Fqdn.class,BaseEdgeDocument.class, +// ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument); + + updateDocument("R_LOCATE_SUBSCRIBER2IP", + LocateSubscriber2Ip.class,BaseEdgeDocument.class, + ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument); + + + long last = System.currentTimeMillis(); + LOG.info("iplearning application运行完毕,用时:"+(last - start)); + }catch (Exception e){ + e.printStackTrace(); + }finally { + arangoManger.clean(); + pool.shutdown(); + } + } + private void updateDocument(HashMap>> newMap, ConcurrentHashMap> historyMap, @@ -116,5 +151,45 @@ public class UpdateGraphData { } } + private void updateDocument(String collection, + Class> taskType, + Class docmentType, + Supplier getSqlSupplier, + Function formatResultFunc){ + ConcurrentHashMap> historyData = baseArangoData.readHistoryData(collection, docmentType); + LOG.info(collection+" 读取clickhouse,封装结果集"); + HashMap>> newData = baseClickhouseData.baseDocFromCk(getSqlSupplier, formatResultFunc); + try { + LOG.info(collection+" 开始更新"); + long start = System.currentTimeMillis(); + CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ + HashMap> tmpNewMap = newData.get(i); + ConcurrentHashMap tmpHisMap = historyData.get(i); + Constructor constructor = taskType.getConstructor( + HashMap.class, + ArangoDBConnect.class, + String.class, + ConcurrentHashMap.class, + CountDownLatch.class); + Document docTask = (Document)constructor.newInstance(tmpNewMap, arangoManger, collection, tmpHisMap, countDownLatch); + pool.executor(docTask); + } + countDownLatch.await(); + long last = System.currentTimeMillis(); + LOG.info(collection+" 更新完毕,共耗时:"+(last-start)); + + }catch (Exception e){ + e.printStackTrace(); + LOG.error("更新"+collection+"失败!!"+e.toString()); + }finally { + newData.clear(); + historyData.clear(); + } + + + } + + } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java index 1165eee..a13e44c 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java @@ -6,7 +6,8 @@ public class IpLearningApplicationTest { public static void main(String[] args) { UpdateGraphData updateGraphData = new UpdateGraphData(); - updateGraphData.updateArango(); +// updateGraphData.updateArango(); + updateGraphData.updateArango2(); } } diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java new file mode 100644 index 0000000..cffc50f --- /dev/null +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java @@ -0,0 +1,53 @@ +package cn.ac.iie; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ExecutorThreadPool; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; +import org.junit.After; +import org.junit.Test; + +import java.util.Enumeration; +import java.util.concurrent.ConcurrentHashMap; + +public class TestReadArango { + private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance(); + private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + private static BaseArangoData baseArangoData = new BaseArangoData(); + + + @Test + public void testReadFqdnFromArango() { + ConcurrentHashMap> historyData = + baseArangoData.readHistoryData("FQDN", BaseDocument.class); + printMap(historyData); + } + + @Test + public void testReadFqdnLocIpFromArango() { + ConcurrentHashMap> ip = + baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", BaseEdgeDocument.class); + printMap(ip); + } + + private void printMap(ConcurrentHashMap> historyData) { + ConcurrentHashMap map = historyData.get(2); + Enumeration keys = map.keys(); + while (keys.hasMoreElements()) { + String key = keys.nextElement(); + T document = map.get(key); + System.out.println(document.toString()); + } + } + + + @After + public void clearSource() { + pool.shutdown(); + arangoManger.clean(); + } + + +} diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestReadClickhouse.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadClickhouse.java new file mode 100644 index 0000000..ef0fe1e --- /dev/null +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadClickhouse.java @@ -0,0 +1,46 @@ +package cn.ac.iie; + +import cn.ac.iie.dao.BaseClickhouseData; +import cn.ac.iie.service.ingestion.ReadClickhouseData; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; + +public class TestReadClickhouse { + + private static BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); + + @Test + public void testReadFqdnFromCk(){ + + + HashMap>> newData = + baseClickhouseData.baseDocFromCk(ReadClickhouseData::getVertexFqdnSql, + ReadClickhouseData::getVertexFqdnDocument); + printMap(newData); + + } + + @Test + public void testReadFqdnLocIpFromCk(){ + HashMap>> map = + baseClickhouseData.baseDocFromCk(ReadClickhouseData::getRelationshipFqdnAddressIpSql, + ReadClickhouseData::getRelationFqdnAddressIpDocument); + + printMap(map); + } + + + private void printMap(HashMap>> newData){ + HashMap> map = newData.get(1); + Set strings = map.keySet(); + for (String key:strings){ + ArrayList baseDocuments = map.get(key); + System.out.println(baseDocuments.get(0)); + } + } +}