diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index 12fc1bd..f890878 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -44,14 +44,14 @@ public class UpdateGraphData { long start = System.currentTimeMillis(); try { - updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class, - ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); - - updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class, - ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument); - - updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", Subscriber.class,BaseDocument.class, - ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument); +// updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class, +// ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); +// +// updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class, +// ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument); +// +// updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", Subscriber.class,BaseDocument.class, +// ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument); updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class, ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument); @@ -60,9 +60,9 @@ public class UpdateGraphData { // VisitIp2Fqdn.class,BaseEdgeDocument.class, // ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument); - updateDocument(newRelationSubsciberLocateIpMap,historyRelationSubsciberLocateIpMap,"R_LOCATE_SUBSCRIBER2IP", - LocateSubscriber2Ip.class,BaseEdgeDocument.class, - ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument); +// updateDocument(newRelationSubsciberLocateIpMap,historyRelationSubsciberLocateIpMap,"R_LOCATE_SUBSCRIBER2IP", +// LocateSubscriber2Ip.class,BaseEdgeDocument.class, +// ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument); long last = System.currentTimeMillis(); @@ -86,7 +86,7 @@ public class UpdateGraphData { ) { try { - baseArangoData.readHistoryData(collection,historyMap,docmentType); +// baseArangoData.readHistoryData(collection,historyMap,docmentType); LOG.info(collection+" 读取clickhouse,封装结果集"); baseClickhouseData.baseDocumentFromClickhouse(newMap, getSqlSupplier,formatResultFunc); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java index da2d897..a1cc7e7 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java @@ -153,7 +153,7 @@ public class ReadClickhouseData { clientIpTs[i] = currentHour; } - String key = vFqdn + "-" + vIp; + String key = vFqdn + "-!!-" + vIp; newDoc = new BaseEdgeDocument(); newDoc.setKey(key); newDoc.setFrom("FQDN/" + vFqdn); @@ -177,7 +177,7 @@ public class ReadClickhouseData { String vFqdn = resultSet.getString("FQDN"); if (isDomain(vFqdn)) { String vIp = resultSet.getString("common_client_ip"); - String key = vIp + "-" + vFqdn; + String key = vIp + "-!!-" + vFqdn; long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); long countTotal = resultSet.getLong("COUNT_TOTAL"); @@ -214,12 +214,8 @@ public class ReadClickhouseData { if (fqdn == null || fqdn.length() == 0){ return false; } - if (fqdn.contains(":")){ - String s = fqdn.split(":")[0]; - if (s.contains(":")){ - return false; - } - } + fqdn = fqdn.split(":")[0]; + String[] fqdnArr = fqdn.split("\\."); if (fqdnArr.length < 4 || fqdnArr.length > 4) { return true; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java index 436b83d..30a957d 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java @@ -2,16 +2,23 @@ package cn.ac.iie.service.update; import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ClickhouseConnect; +import com.alibaba.druid.pool.DruidPooledConnection; import com.arangodb.entity.BaseDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import ru.yandex.clickhouse.ClickHouseArray; +import ru.yandex.clickhouse.domain.ClickHouseDataType; +import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.HashMap; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import static cn.ac.iie.service.ingestion.ReadClickhouseData.currentHour; + public class Document extends Thread{ private static final Logger LOG = LoggerFactory.getLogger(Document.class); private HashMap> newDocumentMap; @@ -20,6 +27,8 @@ public class Document extends Thread{ private ConcurrentHashMap historyDocumentMap; private CountDownLatch countDownLatch; + private ClickhouseConnect manger = ClickhouseConnect.getInstance(); + Document(HashMap> newDocumentMap, ArangoDBConnect arangoManger, String collectionName, @@ -39,25 +48,39 @@ public class Document extends Thread{ LOG.info("新读取数据"+newDocumentMap.size()+"条,历史数据"+historyDocumentMap.size()+"条"); try { Set keySet = newDocumentMap.keySet(); - ArrayList resultDocumentList = new ArrayList<>(); + DruidPooledConnection connection = manger.getConnection(); + String sql = "INSERT INTO ip_learning.r_locate_fqdn2ip_local VALUES(?,?,?,?,?,?,?,?,?)"; + PreparedStatement pstm = connection.prepareStatement(sql); int i = 0; for (String key : keySet) { ArrayList newDocumentSchemaList = newDocumentMap.getOrDefault(key, null); if (newDocumentSchemaList != null) { T newDocument = mergeDocument(newDocumentSchemaList); + String[] splitKey = key.split("-!!-"); + pstm.setString(1,splitKey[0]); + pstm.setString(2,splitKey[1]); + pstm.setLong(3,Long.parseLong(newDocument.getAttribute("FIRST_FOUND_TIME").toString())); + pstm.setLong(4,Long.parseLong(newDocument.getAttribute("LAST_FOUND_TIME").toString())); + pstm.setLong(5,Long.parseLong(newDocument.getAttribute("DNS_CNT_TOTAL").toString())); + pstm.setLong(6,Long.parseLong(newDocument.getAttribute("TLS_CNT_TOTAL").toString())); + pstm.setLong(7,Long.parseLong(newDocument.getAttribute("HTTP_CNT_TOTAL").toString())); + Object[] distCips = (Object[]) newDocument.getAttribute("DIST_CIP"); + pstm.setArray(8,new ClickHouseArray(ClickHouseDataType.Int64, distCips)); + pstm.setLong(9,currentHour); i += 1; - T historyDocument = historyDocumentMap.getOrDefault(key, null); - updateDocument(newDocument,historyDocument,resultDocumentList); + pstm.addBatch(); if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { - arangoManger.overwrite(resultDocumentList, collectionName); - LOG.info("更新"+collectionName+":" + i); + pstm.executeBatch(); + connection.commit(); + LOG.warn("写入clickhouse数据量:" + i); i = 0; } } } if (i != 0) { - arangoManger.overwrite(resultDocumentList, collectionName); - LOG.info("更新"+collectionName+":" + i); + pstm.executeBatch(); + connection.commit(); + LOG.warn("写入clickhouse数据量:" + i); } } catch (Exception e) { e.printStackTrace(); diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index fec32ff..56d8d1c 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -15,8 +15,8 @@ thread.await.termination.time=10 #读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围 -time.limit.type=0 -read.clickhouse.max.time=1598323368 +time.limit.type=1 +read.clickhouse.max.time=1598433621 read.clickhouse.min.time=1597222501 update.interval=3600