diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java deleted file mode 100644 index 841c32b..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ /dev/null @@ -1,118 +0,0 @@ -package cn.ac.iie.dao; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.service.ingestion.ReadHistoryArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import cn.ac.iie.utils.ExecutorThreadPool; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -/** - * 获取arangoDB历史数据 - */ -public class BaseArangoData { - private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); - - static ConcurrentHashMap> historyVertexFqdnMap = new ConcurrentHashMap<>(); - static ConcurrentHashMap> historyVertexIpMap = new ConcurrentHashMap<>(); - static ConcurrentHashMap> historyVertexSubscriberMap = new ConcurrentHashMap<>(); - static ConcurrentHashMap> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>(); - static ConcurrentHashMap> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>(); - static ConcurrentHashMap> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>(); - static ConcurrentHashMap> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>(); - - private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); - - private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); - - void readHistoryData(String table, - ConcurrentHashMap> map, - Class type) { - try { - LOG.info("开始更新"+table); - long start = System.currentTimeMillis(); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ - map.put(i,new ConcurrentHashMap<>()); - } - CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); -// long[] timeRange = getTimeRange(table); - Long countTotal = getCountTotal(table); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { -// String sql = getQuerySql(timeRange, i, table); - String sql = getQuerySql(countTotal, i, table); - ReadHistoryArangoData readHistoryArangoData = - new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch); - threadPool.executor(readHistoryArangoData); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("读取"+table+" arangoDB 共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - } - } - - private Long getCountTotal(String table){ - long start = System.currentTimeMillis(); - Long cnt = 0L; - String sql = "RETURN LENGTH("+table+")"; - try { - ArangoCursor longs = arangoDBConnect.executorQuery(sql, Long.class); - while (longs.hasNext()){ - cnt = longs.next(); - } - }catch (Exception e){ - LOG.error(sql +"执行异常"); - } - long last = System.currentTimeMillis(); - LOG.info(sql+" 结果:"+cnt+" 执行时间:"+(last-start)); - return cnt; - } - - private String getQuerySql(Long cnt,int threadNumber, String table){ - long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER + 1; - long offsetNum = threadNumber * sepNum; - return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc"; - } - - private long[] getTimeRange(String table){ - long minTime = 0L; - long maxTime = 0L; - long startTime = System.currentTimeMillis(); - String sql = "LET doc = (FOR doc IN "+table+" RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}"; - ArangoCursor timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class); - try { - if (timeDoc != null){ - while (timeDoc.hasNext()) { - BaseDocument doc = timeDoc.next(); - maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER; - minTime = Long.parseLong(doc.getAttribute("min_time").toString()); - } - long lastTime = System.currentTimeMillis(); - LOG.info(sql+"\n查询最大最小时间用时:" + (lastTime - startTime)); - }else { - LOG.warn("获取ArangoDb时间范围为空"); - } - }catch (Exception e){ - e.printStackTrace(); - } - return new long[]{minTime, maxTime}; - - } - - private String getQuerySql(long[] timeRange,int threadNumber,String table){ - long minTime = timeRange[0]; - long maxTime = timeRange[1]; - long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; - long maxThreadTime = minTime + (threadNumber + 1)* diffTime; - long minThreadTime = minTime + threadNumber * diffTime; - return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" " + ApplicationConfig.ARANGODB_READ_LIMIT + " RETURN doc"; - } - -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index 7c89a18..bd1db2b 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -35,11 +35,10 @@ public class BaseClickhouseData { private DruidPooledConnection connection; private Statement statement; - void baseDocumentFromClickhouse(HashMap>> newMap, - Supplier getSqlSupplier, + HashMap>> baseDocumentFromClickhouse(Supplier getSqlSupplier, Function formatResultFunc) { long start = System.currentTimeMillis(); - initializeMap(newMap); + HashMap>> newMap = initializeMap(); String sql = getSqlSupplier.get(); try { connection = manger.getConnection(); @@ -60,18 +59,23 @@ public class BaseClickhouseData { }finally { manger.clear(statement,connection); } + return newMap; } - private void initializeMap(HashMap>> map){ + private HashMap>> initializeMap(){ try { + HashMap>> newDataMap = new HashMap<>(); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - map.put(i, new HashMap<>()); + newDataMap.put(i, new HashMap<>()); } + return newDataMap; }catch (Exception e){ e.printStackTrace(); - LOG.error("初始化数据失败"); + LOG.error("数据初始化失败 "+e.toString()); + return null; } } + } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index f890878..f281b5d 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -4,15 +4,8 @@ import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.service.ingestion.ReadClickhouseData; import cn.ac.iie.service.update.Document; import cn.ac.iie.service.update.relationship.LocateFqdn2Ip; -import cn.ac.iie.service.update.relationship.LocateSubscriber2Ip; -import cn.ac.iie.service.update.relationship.VisitIp2Fqdn; -import cn.ac.iie.service.update.vertex.Fqdn; -import cn.ac.iie.service.update.vertex.Ip; -import cn.ac.iie.service.update.vertex.Subscriber; -import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,14 +13,10 @@ import java.lang.reflect.Constructor; import java.sql.ResultSet; import java.util.ArrayList; import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.function.Function; import java.util.function.Supplier; -import static cn.ac.iie.dao.BaseArangoData.*; -import static cn.ac.iie.dao.BaseClickhouseData.*; - /** * 更新图数据库业务类 * @author wlh @@ -35,74 +24,46 @@ import static cn.ac.iie.dao.BaseClickhouseData.*; public class UpdateGraphData { private static final Logger LOG = LoggerFactory.getLogger(UpdateGraphData.class); private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance(); - private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - private static BaseArangoData baseArangoData = new BaseArangoData(); private static BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); public void updateArango(){ long start = System.currentTimeMillis(); try { - -// updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class, -// ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); -// -// updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class, -// ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument); -// -// updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", Subscriber.class,BaseDocument.class, -// ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument); - - updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class, + updateDocument("ip_learning.r_locate_fqdn2ip_local", LocateFqdn2Ip.class, ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument); -// updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN", -// VisitIp2Fqdn.class,BaseEdgeDocument.class, -// ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument); - -// updateDocument(newRelationSubsciberLocateIpMap,historyRelationSubsciberLocateIpMap,"R_LOCATE_SUBSCRIBER2IP", -// LocateSubscriber2Ip.class,BaseEdgeDocument.class, -// ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument); - - long last = System.currentTimeMillis(); LOG.info("iplearning application运行完毕,用时:"+(last - start)); }catch (Exception e){ e.printStackTrace(); }finally { - arangoManger.clean(); pool.shutdown(); } } - private void updateDocument(HashMap>> newMap, - ConcurrentHashMap> historyMap, - String collection, + private void updateDocument(String collection, Class> taskType, - Class docmentType, Supplier getSqlSupplier, Function formatResultFunc ) { try { -// baseArangoData.readHistoryData(collection,historyMap,docmentType); LOG.info(collection+" 读取clickhouse,封装结果集"); - baseClickhouseData.baseDocumentFromClickhouse(newMap, getSqlSupplier,formatResultFunc); + HashMap>> newMap = + baseClickhouseData.baseDocumentFromClickhouse(getSqlSupplier, formatResultFunc); LOG.info(collection+" 开始更新"); long start = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ HashMap> tmpNewMap = newMap.get(i); - ConcurrentHashMap tmpHisMap = historyMap.get(i); Constructor constructor = taskType.getConstructor( HashMap.class, - ArangoDBConnect.class, String.class, - ConcurrentHashMap.class, CountDownLatch.class); - Document docTask = (Document)constructor.newInstance(tmpNewMap, arangoManger, collection, tmpHisMap, countDownLatch); + Document docTask = (Document)constructor.newInstance(tmpNewMap, collection, countDownLatch); pool.executor(docTask); } countDownLatch.await(); @@ -110,11 +71,6 @@ public class UpdateGraphData { LOG.info(collection+" 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); - }finally { - newMap.clear(); - historyMap.clear(); } } - - } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java index a1cc7e7..6c0e357 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java @@ -153,11 +153,11 @@ public class ReadClickhouseData { clientIpTs[i] = currentHour; } - String key = vFqdn + "-!!-" + vIp; + String key = vFqdn + "-" + vIp; newDoc = new BaseEdgeDocument(); newDoc.setKey(key); - newDoc.setFrom("FQDN/" + vFqdn); - newDoc.setTo("IP/" + vIp); + newDoc.setFrom(vFqdn); + newDoc.setTo(vIp); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); newDoc.addAttribute("DIST_CIP", distCipRecents); @@ -177,7 +177,7 @@ public class ReadClickhouseData { String vFqdn = resultSet.getString("FQDN"); if (isDomain(vFqdn)) { String vIp = resultSet.getString("common_client_ip"); - String key = vIp + "-!!-" + vFqdn; + String key = vIp + "-" + vFqdn; long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); long countTotal = resultSet.getLong("COUNT_TOTAL"); @@ -185,8 +185,8 @@ public class ReadClickhouseData { newDoc = new BaseEdgeDocument(); newDoc.setKey(key); - newDoc.setFrom("IP/" + vIp); - newDoc.setTo("FQDN/" + vFqdn); + newDoc.setFrom(vIp); + newDoc.setTo(vFqdn); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); newDoc.addAttribute("PROTOCOL_TYPE", schemaType); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java deleted file mode 100644 index d8ded7e..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java +++ /dev/null @@ -1,122 +0,0 @@ -package cn.ac.iie.service.ingestion; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -import static cn.ac.iie.service.ingestion.ReadClickhouseData.*; - -/** - * @author wlh - * 多线程全量读取arangoDb历史数据,封装到map - */ -@SuppressWarnings("unchecked") -public class ReadHistoryArangoData extends Thread { - private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class); - - private ArangoDBConnect arangoConnect; - private String query; - private ConcurrentHashMap> map; - private Class type; - private String table; - private CountDownLatch countDownLatch; - - public ReadHistoryArangoData(ArangoDBConnect arangoConnect, - String query, - ConcurrentHashMap> map, - Class type, - String table, - CountDownLatch countDownLatch) { - this.arangoConnect = arangoConnect; - this.query = query; - this.map = map; - this.type = type; - this.table = table; - this.countDownLatch = countDownLatch; - } - - @Override - public void run() { - try { - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoConnect.executorQuery(query, type); - if (docs != null) { - ArrayList list = new ArrayList<>(); - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (T doc : baseDocuments) { - String key = doc.getKey(); - switch (table) { - case "R_LOCATE_FQDN2IP": - updateProtocolDocument(doc); - deleteDistinctClientIpByTime(doc); - list.add(doc); - break; - default: - } - int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; - ConcurrentHashMap tmpMap = map.get(hashCode); - tmpMap.put(key, doc); - i++; - } - arangoConnect.overwrite(list,table); - long l = System.currentTimeMillis(); - LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s)); - } - } catch (Exception e) { - LOG.error(Arrays.toString(e.getStackTrace())); - } finally { - countDownLatch.countDown(); - LOG.info("本线程读取完毕,剩余线程数量:" + countDownLatch.getCount()); - } - } - - private void updateProtocolDocument(T doc) { - if (doc.getProperties().containsKey("PROTOCOL_TYPE")) { - for (String protocol : ReadClickhouseData.PROTOCOL_SET) { - String protocolRecent = protocol + "_CNT_RECENT"; - ArrayList cntRecent = (ArrayList) doc.getAttribute(protocolRecent); - Long[] cntRecentsSrc = cntRecent.toArray(new Long[0]); - Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR]; - System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); - cntRecentsDst[0] = 0L; - doc.addAttribute(protocolRecent, cntRecentsDst); - } - } - } - - private void deleteDistinctClientIpByTime(T doc) { - ArrayList distCip = (ArrayList) doc.getAttribute("DIST_CIP"); - ArrayList distCipTs = (ArrayList) doc.getAttribute("DIST_CIP_TS"); - if (distCip == null || distCip.isEmpty()){ - doc.updateAttribute("DIST_CIP", new String[0]); - doc.updateAttribute("DIST_CIP_TS", new long[0]); - return; - } - distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600); - Collections.sort(distCipTs); - Collections.reverse(distCipTs); - int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600); - String[] distCipArr = new String[index]; - long[] disCipTsArr = new long[index]; - if (index != 0 && distCip.size() + 1 == distCipTs.size()){ - for (int i = 0; i < index; i++) { - distCipArr[i] = distCip.get(i); - disCipTsArr[i] = distCipTs.get(i); - } - } - doc.updateAttribute("DIST_CIP", distCipArr); - doc.updateAttribute("DIST_CIP_TS", disCipTsArr); - } - -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java index 30a957d..fe8e924 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java @@ -1,43 +1,32 @@ package cn.ac.iie.service.update; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ClickhouseConnect; import com.alibaba.druid.pool.DruidPooledConnection; import com.arangodb.entity.BaseDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import ru.yandex.clickhouse.ClickHouseArray; -import ru.yandex.clickhouse.domain.ClickHouseDataType; import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import static cn.ac.iie.service.ingestion.ReadClickhouseData.currentHour; - public class Document extends Thread{ private static final Logger LOG = LoggerFactory.getLogger(Document.class); private HashMap> newDocumentMap; - private ArangoDBConnect arangoManger; private String collectionName; - private ConcurrentHashMap historyDocumentMap; private CountDownLatch countDownLatch; private ClickhouseConnect manger = ClickhouseConnect.getInstance(); Document(HashMap> newDocumentMap, - ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { this.newDocumentMap = newDocumentMap; - this.arangoManger = arangoManger; this.collectionName = collectionName; - this.historyDocumentMap = historyDocumentMap; this.countDownLatch = countDownLatch; } @@ -45,28 +34,18 @@ public class Document extends Thread{ @Override public void run() { long start = System.currentTimeMillis(); - LOG.info("新读取数据"+newDocumentMap.size()+"条,历史数据"+historyDocumentMap.size()+"条"); + LOG.info("读取数据"+newDocumentMap.size()+"条"); try { Set keySet = newDocumentMap.keySet(); DruidPooledConnection connection = manger.getConnection(); - String sql = "INSERT INTO ip_learning.r_locate_fqdn2ip_local VALUES(?,?,?,?,?,?,?,?,?)"; + String sql = "INSERT INTO "+collectionName+" VALUES(?,?,?,?,?,?,?,?,?)"; PreparedStatement pstm = connection.prepareStatement(sql); int i = 0; for (String key : keySet) { ArrayList newDocumentSchemaList = newDocumentMap.getOrDefault(key, null); if (newDocumentSchemaList != null) { T newDocument = mergeDocument(newDocumentSchemaList); - String[] splitKey = key.split("-!!-"); - pstm.setString(1,splitKey[0]); - pstm.setString(2,splitKey[1]); - pstm.setLong(3,Long.parseLong(newDocument.getAttribute("FIRST_FOUND_TIME").toString())); - pstm.setLong(4,Long.parseLong(newDocument.getAttribute("LAST_FOUND_TIME").toString())); - pstm.setLong(5,Long.parseLong(newDocument.getAttribute("DNS_CNT_TOTAL").toString())); - pstm.setLong(6,Long.parseLong(newDocument.getAttribute("TLS_CNT_TOTAL").toString())); - pstm.setLong(7,Long.parseLong(newDocument.getAttribute("HTTP_CNT_TOTAL").toString())); - Object[] distCips = (Object[]) newDocument.getAttribute("DIST_CIP"); - pstm.setArray(8,new ClickHouseArray(ClickHouseDataType.Int64, distCips)); - pstm.setLong(9,currentHour); + pstm = setPstm(pstm, newDocument); i += 1; pstm.addBatch(); if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { @@ -92,18 +71,8 @@ public class Document extends Thread{ } } - private void updateDocument(T newDocument, T historyDocument, ArrayList resultDocumentList) { - if (historyDocument != null){ - updateFunction(newDocument,historyDocument); - resultDocumentList.add(historyDocument); - }else { - resultDocumentList.add(newDocument); - } - } - - protected void updateFunction(T newDocument, T historyDocument) { - Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); - historyDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); + protected PreparedStatement setPstm(PreparedStatement pstm,T newDocument) throws SQLException { + return pstm; } private T mergeDocument(ArrayList newDocumentSchemaList){ diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java index 7930463..659159c 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java @@ -1,27 +1,18 @@ package cn.ac.iie.service.update; import cn.ac.iie.service.ingestion.ReadClickhouseData; -import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; public class Relationship extends Document { public Relationship(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap,arangoManger,collectionName,historyDocumentMap,countDownLatch); - } - - @Override - protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument){ - super.updateFunction(newEdgeDocument,historyEdgeDocument); + super(newDocumentHashMap,collectionName,countDownLatch); } @Override @@ -29,26 +20,6 @@ public class Relationship extends Document { super.mergeFunction(lastDoc, newDocument); } - protected void updateProcotol(BaseEdgeDocument historyEdgeDocument, String schema, BaseEdgeDocument newEdgeDocument){ - String recentSchema = schema +"_CNT_RECENT"; - String totalSchema = schema + "_CNT_TOTAL"; - long countTotal = Long.parseLong(newEdgeDocument.getAttribute(totalSchema).toString()); - if (countTotal > 0L){ - long updateCountTotal = Long.parseLong(historyEdgeDocument.getAttribute(totalSchema).toString()); - - Long[] cntRecent = (Long[]) historyEdgeDocument.getAttribute(recentSchema); - cntRecent[0] = countTotal; - - historyEdgeDocument.addAttribute(recentSchema, cntRecent); - historyEdgeDocument.addAttribute(totalSchema, countTotal + updateCountTotal); - String hisProtocolType = historyEdgeDocument.getAttribute("PROTOCOL_TYPE").toString(); - if (!hisProtocolType.contains(schema)){ - hisProtocolType = hisProtocolType + "," + schema; - historyEdgeDocument.addAttribute("PROTOCOL_TYPE",hisProtocolType); - } - } - } - protected void mergeProtocol(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) { String schema = lastDoc.getAttribute("PROTOCOL_TYPE").toString(); if (ReadClickhouseData.PROTOCOL_SET.contains(schema)){ diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java index 10a4d29..bfe6373 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java @@ -1,11 +1,9 @@ package cn.ac.iie.service.update; -import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; /** @@ -15,11 +13,9 @@ import java.util.concurrent.CountDownLatch; public class Vertex extends Document { public Vertex(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); + super(newDocumentHashMap, collectionName, countDownLatch); } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java index 82d1a1a..2ec2012 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java @@ -1,25 +1,23 @@ package cn.ac.iie.service.update.relationship; -import cn.ac.iie.service.ingestion.ReadClickhouseData; import cn.ac.iie.service.update.Relationship; -import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; +import ru.yandex.clickhouse.ClickHouseArray; +import ru.yandex.clickhouse.domain.ClickHouseDataType; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import static cn.ac.iie.service.ingestion.ReadClickhouseData.DISTINCT_CLIENT_IP_NUM; import static cn.ac.iie.service.ingestion.ReadClickhouseData.currentHour; public class LocateFqdn2Ip extends Relationship { public LocateFqdn2Ip(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); + super(newDocumentHashMap, collectionName,countDownLatch); } @Override @@ -29,7 +27,23 @@ public class LocateFqdn2Ip extends Relationship { mergeProtocol(lastDoc, newDocument); } - private void mergeDistinctClientIp(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument){ + @Override + protected PreparedStatement setPstm(PreparedStatement pstm,BaseEdgeDocument newDocument) throws SQLException { + pstm.setString(1,newDocument.getFrom()); + pstm.setString(2,newDocument.getTo()); + pstm.setLong(3,Long.parseLong(newDocument.getAttribute("FIRST_FOUND_TIME").toString())); + pstm.setLong(4,Long.parseLong(newDocument.getAttribute("LAST_FOUND_TIME").toString())); + pstm.setLong(5,Long.parseLong(newDocument.getAttribute("DNS_CNT_TOTAL").toString())); + pstm.setLong(6,Long.parseLong(newDocument.getAttribute("TLS_CNT_TOTAL").toString())); + pstm.setLong(7,Long.parseLong(newDocument.getAttribute("HTTP_CNT_TOTAL").toString())); + Object[] distCips = (Object[]) newDocument.getAttribute("DIST_CIP"); + pstm.setArray(8,new ClickHouseArray(ClickHouseDataType.Int64, distCips)); + pstm.setLong(9,currentHour); + + return pstm; + } + + private void mergeDistinctClientIp(BaseEdgeDocument lastDoc, BaseEdgeDocument newDocument){ HashSet clientIpSet = new HashSet<>(); String[] distCips = (String[]) newDocument.getAttribute("DIST_CIP"); String[] lastDistCips = (String[]) lastDoc.getAttribute("DIST_CIP"); @@ -43,56 +57,5 @@ public class LocateFqdn2Ip extends Relationship { newDocument.addAttribute("DIST_CIP_TS", clientIpTs); } - @Override - protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { - super.updateFunction(newEdgeDocument, historyEdgeDocument); - for (String schema:ReadClickhouseData.PROTOCOL_SET){ - updateProcotol(historyEdgeDocument,schema,newEdgeDocument); - } - updateDistinctClientIp(newEdgeDocument, historyEdgeDocument); - } - - private void updateDistinctClientIp(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){ - String[] distCip = (String[]) edgeDocument.getAttribute("DIST_CIP"); - long[] distCipTs = (long[]) edgeDocument.getAttribute("DIST_CIP_TS"); - HashMap distCipToTs = new HashMap<>(); - if (distCip.length == distCipTs.length){ - for (int i = 0;i < distCip.length;i++){ - distCipToTs.put(distCip[i],distCipTs[i]); - } - } - Object[] distCipRecent = (Object[])newEdgeDocument.getAttribute("DIST_CIP"); - for (Object cip:distCipRecent){ - distCipToTs.put(cip.toString(), currentHour); - } - - Map sortDistCip = sortMapByValue(distCipToTs); - edgeDocument.addAttribute("DIST_CIP",sortDistCip.keySet().toArray()); - edgeDocument.addAttribute("DIST_CIP_TS",sortDistCip.values().toArray()); - } - - - /** - * 使用 Map按value进行排序 - */ - private Map sortMapByValue(Map oriMap) { - if (oriMap == null || oriMap.isEmpty()) { - return null; - } - Map sortedMap = new LinkedHashMap<>(); - List> entryList = new ArrayList<>(oriMap.entrySet()); - entryList.sort((o1, o2) -> o2.getValue().compareTo(o1.getValue())); - - if(entryList.size() > DISTINCT_CLIENT_IP_NUM){ - for(Map.Entry set:entryList.subList(0, DISTINCT_CLIENT_IP_NUM)){ - sortedMap.put(set.getKey(), set.getValue()); - } - }else { - for(Map.Entry set:entryList){ - sortedMap.put(set.getKey(), set.getValue()); - } - } - return sortedMap; - } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateSubscriber2Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateSubscriber2Ip.java index 5ca4cb0..0aafce5 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateSubscriber2Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/LocateSubscriber2Ip.java @@ -1,21 +1,17 @@ package cn.ac.iie.service.update.relationship; import cn.ac.iie.service.update.Relationship; -import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; public class LocateSubscriber2Ip extends Relationship { public LocateSubscriber2Ip(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); + super(newDocumentHashMap, collectionName, countDownLatch); } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java index e240529..b4394d0 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java @@ -2,30 +2,17 @@ package cn.ac.iie.service.update.relationship; import cn.ac.iie.service.ingestion.ReadClickhouseData; import cn.ac.iie.service.update.Relationship; -import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseEdgeDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; public class VisitIp2Fqdn extends Relationship { public VisitIp2Fqdn(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); - } - - @Override - protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) { - super.updateFunction(newEdgeDocument, historyEdgeDocument); - for (String schema: ReadClickhouseData.PROTOCOL_SET){ - updateProcotol(historyEdgeDocument,schema,newEdgeDocument); - } + super(newDocumentHashMap, collectionName,countDownLatch); } @Override diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Fqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Fqdn.java index c13ca8c..0e78e2a 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Fqdn.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Fqdn.java @@ -1,21 +1,17 @@ package cn.ac.iie.service.update.vertex; import cn.ac.iie.service.update.Vertex; -import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; public class Fqdn extends Vertex { public Fqdn(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); + super(newDocumentHashMap, collectionName,countDownLatch); } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java index 9a44fac..34d619d 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java @@ -1,29 +1,18 @@ package cn.ac.iie.service.update.vertex; import cn.ac.iie.service.update.Vertex; -import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; public class Ip extends Vertex { public Ip(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch); - } - - @Override - protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) { - super.updateFunction(newDocument, historyDocument); - updateIpByType(newDocument, historyDocument); - super.replaceAttribute(newDocument,historyDocument,"COMMON_LINK_INFO"); + super(newDocumentHashMap, collectionName,countDownLatch); } @Override diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Subscriber.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Subscriber.java index 02f1468..54c29e8 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Subscriber.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Subscriber.java @@ -1,21 +1,17 @@ package cn.ac.iie.service.update.vertex; import cn.ac.iie.service.update.Vertex; -import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.entity.BaseDocument; import java.util.ArrayList; import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; public class Subscriber extends Vertex { public Subscriber(HashMap> newDocumentHashMap, - ArangoDBConnect arangoManger, String collectionName, - ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); + super(newDocumentHashMap, collectionName, countDownLatch); } } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java deleted file mode 100644 index 1346ee3..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ /dev/null @@ -1,96 +0,0 @@ -package cn.ac.iie.utils; - -import cn.ac.iie.config.ApplicationConfig; -import com.arangodb.ArangoCollection; -import com.arangodb.ArangoCursor; -import com.arangodb.ArangoDB; -import com.arangodb.ArangoDatabase; -import com.arangodb.entity.DocumentCreateEntity; -import com.arangodb.entity.ErrorEntity; -import com.arangodb.entity.MultiDocumentEntity; -import com.arangodb.model.AqlQueryOptions; -import com.arangodb.model.DocumentCreateOptions; -import com.arangodb.util.MapBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -public class ArangoDBConnect { - private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class); - private static ArangoDB arangoDB = null; - private static ArangoDBConnect conn = null; - static { - getArangoDB(); - } - - private static void getArangoDB(){ - arangoDB = new ArangoDB.Builder() - .maxConnections(ApplicationConfig.THREAD_POOL_NUMBER) - .host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT) - .user(ApplicationConfig.ARANGODB_USER) - .password(ApplicationConfig.ARANGODB_PASSWORD) - .build(); - } - - public static synchronized ArangoDBConnect getInstance(){ - if (null == conn){ - conn = new ArangoDBConnect(); - } - return conn; - } - - private ArangoDatabase getDatabase(){ - return arangoDB.db(ApplicationConfig.ARANGODB_DB_NAME); - } - - public void clean(){ - try { - if (arangoDB != null){ - arangoDB.shutdown(); - } - }catch (Exception e){ - LOG.error(e.getMessage()); - } - } - - public ArangoCursor executorQuery(String query,Class type){ - ArangoDatabase database = getDatabase(); - Map bindVars = new MapBuilder().get(); - AqlQueryOptions options = new AqlQueryOptions() - .ttl(ApplicationConfig.ARANGODB_TTL); - try { - return database.query(query, bindVars, options, type); - }catch (Exception e){ - LOG.error(e.getMessage()); - return null; - }finally { - bindVars.clear(); - } - } - - public void overwrite(List docOverwrite, String collectionName){ - ArangoDatabase database = getDatabase(); - try { - ArangoCollection collection = database.collection(collectionName); - if (!docOverwrite.isEmpty()){ - DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions(); - documentCreateOptions.overwrite(true); - documentCreateOptions.silent(true); - MultiDocumentEntity> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions); - Collection errors = documentCreateEntityMultiDocumentEntity.getErrors(); - for (ErrorEntity errorEntity:errors){ - LOG.error("写入arangoDB异常:"+errorEntity.getErrorMessage()); - } - } - }catch (Exception e){ - LOG.error("更新失败:"+e.toString()); - }finally { - docOverwrite.clear(); - } - } - -} diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java index 2e5ac36..62288bf 100644 --- a/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java @@ -1,6 +1,5 @@ package cn.ac.iie; -import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.ArangoCursor; import com.arangodb.entity.BaseEdgeDocument; diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java index 46e6c88..8d7c31e 100644 --- a/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java @@ -1,12 +1,8 @@ package cn.ac.iie; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.entity.*; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; public class TestMap { diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java b/IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java deleted file mode 100644 index 93c4e49..0000000 --- a/IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java +++ /dev/null @@ -1,11 +0,0 @@ -package cn.ac.iie; - -import cn.ac.iie.dao.BaseArangoData; -import com.arangodb.entity.BaseEdgeDocument; - -public class readHistoryDataTest { - public static void main(String[] args) { - BaseArangoData baseArangoData = new BaseArangoData(); - - } -}