From 6ea2518ee498c54eee132d87c8d77909bb9b307f Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Fri, 7 Aug 2020 10:07:52 +0800 Subject: [PATCH 1/4] =?UTF-8?q?IP=20Learning=20tsg=E9=A1=B9=E7=9B=AE=20spa?= =?UTF-8?q?rk=E7=89=88=E6=9C=AC=E9=A6=96=E6=AC=A1=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ac/iie/service/update/UpdateDocument.scala | 4 ++++ .../test/scala/cn/ac/iie/test/threadTest.java | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+) create mode 100644 ip-learning-spark/src/test/scala/cn/ac/iie/test/threadTest.java diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index b5f875f..038e301 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -22,6 +22,10 @@ object UpdateDocument { private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass) private val baseArangoData = new BaseArangoData() + def updateDocument(): Unit ={ + + } + def updateVertexFqdn(): Unit ={ baseArangoData.readHistoryData("FQDN",historyVertexFqdnMap,classOf[BaseDocument]) val hisVerFqdnBc = spark.sparkContext.broadcast(historyVertexFqdnMap) diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/test/threadTest.java b/ip-learning-spark/src/test/scala/cn/ac/iie/test/threadTest.java new file mode 100644 index 0000000..912f4a2 --- /dev/null +++ b/ip-learning-spark/src/test/scala/cn/ac/iie/test/threadTest.java @@ -0,0 +1,18 @@ +package cn.ac.iie.test; + +public class threadTest implements Runnable { + private int count = 10; + @Override + public /*synchronized*/ void run() { + count--; + System.out.println(Thread.currentThread().getName() + " count = " + count); + } + + public static void main(String[] args) { + threadTest t = new threadTest(); + for(int i=0; i<5; i++) { + new Thread(t, "THREAD" + i).start(); + } + } + +} From 2a4d6dda4ad2c9b1f57994938abccaadc086ac07 Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Fri, 7 Aug 2020 10:39:22 +0800 Subject: [PATCH 2/4] =?UTF-8?q?IP=20Learning=20tsg=E9=A1=B9=E7=9B=AE=20spa?= =?UTF-8?q?rk=E7=89=88=E6=9C=AC=E9=A6=96=E6=AC=A1=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iie/service/update/UpdateDocument.scala | 125 ++++++++++-------- 1 file changed, 68 insertions(+), 57 deletions(-) diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index 038e301..b417624 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -1,5 +1,7 @@ package cn.ac.iie.service.update + import java.util +import java.util.concurrent.ConcurrentHashMap import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.dao.BaseArangoData @@ -10,6 +12,7 @@ import cn.ac.iie.utils.ArangoDBConnect import cn.ac.iie.utils.SparkSessionUtil.spark import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} import org.apache.spark.TaskContext +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.slf4j.LoggerFactory @@ -22,12 +25,20 @@ object UpdateDocument { private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass) private val baseArangoData = new BaseArangoData() - def updateDocument(): Unit ={ + def updateDocument[T <: BaseDocument](collName: String, + historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]], + getDocumentRow: Row => T, + getNewDataRdd: Unit => RDD[Row] + ): Unit = { + baseArangoData.readHistoryData(collName, historyMap, classOf[T]) + val hisBc = spark.sparkContext.broadcast(historyMap) + try { + } } - def updateVertexFqdn(): Unit ={ - baseArangoData.readHistoryData("FQDN",historyVertexFqdnMap,classOf[BaseDocument]) + def updateVertexFqdn(): Unit = { + baseArangoData.readHistoryData("FQDN", historyVertexFqdnMap, classOf[BaseDocument]) val hisVerFqdnBc = spark.sparkContext.broadcast(historyVertexFqdnMap) try { val start = System.currentTimeMillis() @@ -41,19 +52,19 @@ object UpdateDocument { val fqdn = row.getAs[String]("FQDN") val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") - var document: BaseDocument = hisVerFqdnMapTmp.getOrDefault(fqdn,null) - if (document != null){ - updateMaxAttribute(document,lastFoundTime,"LAST_FOUND_TIME") - } else{ + var document: BaseDocument = hisVerFqdnMapTmp.getOrDefault(fqdn, null) + if (document != null) { + updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") + } else { document = new BaseDocument document.setKey(fqdn) - document.addAttribute("FQDN_NAME",fqdn) - document.addAttribute("FIRST_FOUND_TIME",firstFoundTime) - document.addAttribute("LAST_FOUND_TIME",lastFoundTime) + document.addAttribute("FQDN_NAME", fqdn) + document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) + document.addAttribute("LAST_FOUND_TIME", lastFoundTime) } resultDocumentList.add(document) - i+=1 - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + i += 1 + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { arangoManger.overwrite(resultDocumentList, "FQDN") LOG.warn("更新FQDN:" + i) i = 0 @@ -65,16 +76,16 @@ object UpdateDocument { } }) val last = System.currentTimeMillis() - LOG.warn(s"更新FQDN时间:${last-start}") - }catch { - case e:Exception => e.printStackTrace() - }finally { + LOG.warn(s"更新FQDN时间:${last - start}") + } catch { + case e: Exception => e.printStackTrace() + } finally { hisVerFqdnBc.destroy() } } - def updateVertexIp(): Unit ={ - baseArangoData.readHistoryData("IP",historyVertexIpMap,classOf[BaseDocument]) + def updateVertexIp(): Unit = { + baseArangoData.readHistoryData("IP", historyVertexIpMap, classOf[BaseDocument]) val hisVerIpBc = spark.sparkContext.broadcast(historyVertexIpMap) try { val start = System.currentTimeMillis() @@ -91,30 +102,30 @@ object UpdateDocument { val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST") val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST") val ipTypeList = row.getAs[ofRef[String]]("ip_type_list") - val sepAttributeTuple = separateAttributeByIpType(ipTypeList,sessionCountList,bytesSumList) + val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList) - var document = hisVerIpMapTmp.getOrDefault(ip,null) - if (document != null){ - updateMaxAttribute(document,lastFoundTime,"LAST_FOUND_TIME") - updateSumAttribute(document,sepAttributeTuple._1,"SERVER_SESSION_COUNT") - updateSumAttribute(document,sepAttributeTuple._2,"SERVER_BYTES_SUM") - updateSumAttribute(document,sepAttributeTuple._3,"CLIENT_SESSION_COUNT") - updateSumAttribute(document,sepAttributeTuple._4,"CLIENT_BYTES_SUM") + var document = hisVerIpMapTmp.getOrDefault(ip, null) + if (document != null) { + updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") + updateSumAttribute(document, sepAttributeTuple._1, "SERVER_SESSION_COUNT") + updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM") + updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT") + updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM") } else { document = new BaseDocument document.setKey(ip) - document.addAttribute("IP",ip) - document.addAttribute("FIRST_FOUND_TIME",firstFoundTime) - document.addAttribute("LAST_FOUND_TIME",lastFoundTime) - document.addAttribute("SERVER_SESSION_COUNT",sepAttributeTuple._1) - document.addAttribute("SERVER_BYTES_SUM",sepAttributeTuple._2) - document.addAttribute("CLIENT_SESSION_COUNT",sepAttributeTuple._3) - document.addAttribute("CLIENT_BYTES_SUM",sepAttributeTuple._4) - document.addAttribute("COMMON_LINK_INFO","") + document.addAttribute("IP", ip) + document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) + document.addAttribute("LAST_FOUND_TIME", lastFoundTime) + document.addAttribute("SERVER_SESSION_COUNT", sepAttributeTuple._1) + document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2) + document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3) + document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4) + document.addAttribute("COMMON_LINK_INFO", "") } resultDocumentList.add(document) - i+=1 - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + i += 1 + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { arangoManger.overwrite(resultDocumentList, "IP") LOG.warn("更新IP:" + i) i = 0 @@ -126,15 +137,15 @@ object UpdateDocument { } }) val last = System.currentTimeMillis() - LOG.warn(s"更新IP时间:${last-start}") - }catch { - case e:Exception => e.printStackTrace() - }finally { + LOG.warn(s"更新IP时间:${last - start}") + } catch { + case e: Exception => e.printStackTrace() + } finally { hisVerIpBc.destroy() } } - def updateRelationFqdnLocateIp(): Unit ={ + def updateRelationFqdnLocateIp(): Unit = { baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument]) val hisReFqdnLocIpBc = spark.sparkContext.broadcast(historyRelationFqdnAddressIpMap) try { @@ -154,28 +165,28 @@ object UpdateDocument { val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") - val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList,countTotalList) + val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) - val key = fqdn.concat("-"+serverIp) - var document: BaseEdgeDocument = hisRelaFqdnLocaIpMapTmp.getOrDefault(key,null) - if (document != null){ - updateMaxAttribute(document,lastFoundTime,"LAST_FOUND_TIME") - updateProtocolAttritube(document,sepAttritubeMap) - updateDistinctIp(document,distinctIp) - }else { + val key = fqdn.concat("-" + serverIp) + var document: BaseEdgeDocument = hisRelaFqdnLocaIpMapTmp.getOrDefault(key, null) + if (document != null) { + updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") + updateProtocolAttritube(document, sepAttritubeMap) + updateDistinctIp(document, distinctIp) + } else { document = new BaseEdgeDocument() document.setKey(key) document.setFrom("FQDN/" + fqdn) document.setTo("IP/" + serverIp) document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) document.addAttribute("LAST_FOUND_TIME", lastFoundTime) - putProtocolAttritube(document,sepAttritubeMap) - putDistinctIp(document,distinctIp) + putProtocolAttritube(document, sepAttritubeMap) + putDistinctIp(document, distinctIp) } resultDocumentList.add(document) - i+=1 - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + i += 1 + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { arangoManger.overwrite(resultDocumentList, "R_LOCATE_FQDN2IP") LOG.warn("更新R_LOCATE_FQDN2IP:" + i) i = 0 @@ -187,10 +198,10 @@ object UpdateDocument { } }) val last = System.currentTimeMillis() - LOG.warn(s"更新R_LOCATE_FQDN2IP时间:${last-start}") - }catch { - case e:Exception => e.printStackTrace() - }finally { + LOG.warn(s"更新R_LOCATE_FQDN2IP时间:${last - start}") + } catch { + case e: Exception => e.printStackTrace() + } finally { hisReFqdnLocIpBc.destroy() } } From e946b506d399140bdf3c722139de031628f28ab0 Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Mon, 10 Aug 2020 18:37:53 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ingestion/ReadHistoryArangoData.java | 3 ++- .../java/cn/ac/iie/utils/ArangoDBConnect.java | 3 ++- .../src/test/java/cn/ac/iie/TestList.java | 21 +++++++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java index a4541f9..8b51128 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java @@ -19,6 +19,7 @@ import static cn.ac.iie.service.ingestion.ReadClickhouseData.*; * @author wlh * 多线程全量读取arangoDb历史数据,封装到map */ +@SuppressWarnings("unchecked") public class ReadHistoryArangoData extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class); @@ -84,7 +85,7 @@ public class ReadHistoryArangoData extends Thread { for (String protocol : ReadClickhouseData.PROTOCOL_SET) { String protocolRecent = protocol + "_CNT_RECENT"; ArrayList cntRecent = (ArrayList) doc.getAttribute(protocolRecent); - Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); + Long[] cntRecentsSrc = cntRecent.toArray(new Long[0]); Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR]; System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); cntRecentsDst[0] = 0L; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index fd4d91e..1346ee3 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; public class ArangoDBConnect { @@ -71,7 +72,7 @@ public class ArangoDBConnect { } } - public void overwrite(ArrayList docOverwrite,String collectionName){ + public void overwrite(List docOverwrite, String collectionName){ ArangoDatabase database = getDatabase(); try { ArangoCollection collection = database.collection(collectionName); diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java index 83ce03c..d437595 100644 --- a/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java @@ -1,10 +1,25 @@ package cn.ac.iie; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseEdgeDocument; + import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; public class TestList { public static void main(String[] args) { + ArangoDBConnect arangoConnect = ArangoDBConnect.getInstance(); + ArangoCursor documents = arangoConnect.executorQuery("FOR doc IN R_LOCATE_FQDN2IP filter doc.FIRST_FOUND_TIME >= 1596080839 and doc.FIRST_FOUND_TIME <= 1596395473 RETURN doc", BaseEdgeDocument.class); + List baseEdgeDocuments = documents.asListRemaining(); + for (BaseEdgeDocument doc: baseEdgeDocuments){ + doc.updateAttribute("PROTOCOL_TYPE","123"); + } + + + /* ArrayList integers = new ArrayList<>(); integers.add(10); integers.add(8); @@ -14,11 +29,17 @@ public class TestList { integers.add(4); integers.add(4); integers.add(12); + + Integer[] objects = integers.toArray(new Integer[2]); + System.out.println(Arrays.toString(objects)); + + Collections.sort(integers); System.out.println(integers); integers.add(5); Collections.sort(integers); System.out.println(integers); System.out.println(integers.indexOf(5)); + */ } } From 2592b5b8aa25c3aea746e58b4cdf4fab7a1979ba Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Mon, 10 Aug 2020 18:38:15 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E6=8A=BD=E8=B1=A1=E5=85=AC=E5=85=B1?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application.properties | 2 +- .../ac/iie/main/IpLearningApplication.scala | 16 +- .../iie/service/update/UpdateDocument.scala | 263 +++++++----------- 3 files changed, 110 insertions(+), 171 deletions(-) diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties index 473771f..0010b23 100644 --- a/ip-learning-spark/src/main/resources/application.properties +++ b/ip-learning-spark/src/main/resources/application.properties @@ -42,4 +42,4 @@ update.arango.batch=10000 distinct.client.ip.num=10000 recent.count.hour=24 -update.interval=3600 +update.interval=10800 diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala index e3602d3..b190ad9 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala @@ -1,22 +1,10 @@ package cn.ac.iie.main -import cn.ac.iie.service.update.UpdateDocument._ -import cn.ac.iie.utils.{ExecutorThreadPool, SparkSessionUtil} +import cn.ac.iie.service.update.UpdateDocument object IpLearningApplication { - private val pool = ExecutorThreadPool.getInstance def main(args: Array[String]): Unit = { - try { - updateVertexFqdn() - updateVertexIp() - updateRelationFqdnLocateIp() - }catch { - case e:Exception => e.printStackTrace() - }finally { - pool.shutdown() - arangoManger.clean() - SparkSessionUtil.closeSpark() - } + UpdateDocument.update() } } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index b417624..b7d4875 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -8,11 +8,10 @@ import cn.ac.iie.dao.BaseArangoData import cn.ac.iie.dao.BaseArangoData._ import cn.ac.iie.service.transform.MergeDataFrame._ import cn.ac.iie.service.update.UpdateDocHandler._ -import cn.ac.iie.utils.ArangoDBConnect +import cn.ac.iie.utils.{ArangoDBConnect, ExecutorThreadPool, SparkSessionUtil} import cn.ac.iie.utils.SparkSessionUtil.spark import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} import org.apache.spark.TaskContext -import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.slf4j.LoggerFactory @@ -20,190 +19,142 @@ import org.slf4j.LoggerFactory import scala.collection.mutable.WrappedArray.ofRef object UpdateDocument { - - val arangoManger: ArangoDBConnect = ArangoDBConnect.getInstance() + private val pool = ExecutorThreadPool.getInstance + private val arangoManger: ArangoDBConnect = ArangoDBConnect.getInstance() private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass) private val baseArangoData = new BaseArangoData() - def updateDocument[T <: BaseDocument](collName: String, - historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]], - getDocumentRow: Row => T, - getNewDataRdd: Unit => RDD[Row] - ): Unit = { - baseArangoData.readHistoryData(collName, historyMap, classOf[T]) + def update(): Unit = { + try { + updateDocument("FQDN", historyVertexFqdnMap, getVertexFqdnRow, classOf[BaseDocument], mergeVertexFqdn) + updateDocument("IP", historyVertexIpMap, getVertexIpRow, classOf[BaseDocument], mergeVertexIp) + updateDocument("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp) + } catch { + case e: Exception => e.printStackTrace() + } finally { + pool.shutdown() + arangoManger.clean() + SparkSessionUtil.closeSpark() + } + } + + private def updateDocument[T <: BaseDocument](collName: String, + historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]], + getDocumentRow: (Row, ConcurrentHashMap[String, T]) => T, + clazz: Class[T], + getNewDataRdd: () => RDD[Row] + ): Unit = { + baseArangoData.readHistoryData(collName, historyMap, clazz) val hisBc = spark.sparkContext.broadcast(historyMap) - try { - - } - } - - def updateVertexFqdn(): Unit = { - baseArangoData.readHistoryData("FQDN", historyVertexFqdnMap, classOf[BaseDocument]) - val hisVerFqdnBc = spark.sparkContext.broadcast(historyVertexFqdnMap) try { val start = System.currentTimeMillis() - val mergeVertexFqdnDf: RDD[Row] = mergeVertexFqdn() - mergeVertexFqdnDf.foreachPartition(iter => { + val newDataRdd = getNewDataRdd() + newDataRdd.foreachPartition(iter => { val partitionId: Int = TaskContext.get.partitionId - val hisVerFqdnMapTmp = hisVerFqdnBc.value.get(partitionId) - val resultDocumentList: util.ArrayList[BaseDocument] = new util.ArrayList[BaseDocument] + val dictionaryMap: ConcurrentHashMap[String, T] = hisBc.value.get(partitionId) + val resultDocumentList = new util.ArrayList[T] var i = 0 iter.foreach(row => { - val fqdn = row.getAs[String]("FQDN") - val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") - val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") - var document: BaseDocument = hisVerFqdnMapTmp.getOrDefault(fqdn, null) - if (document != null) { - updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") - } else { - document = new BaseDocument - document.setKey(fqdn) - document.addAttribute("FQDN_NAME", fqdn) - document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) - document.addAttribute("LAST_FOUND_TIME", lastFoundTime) - } + val document = getDocumentRow(row, dictionaryMap) resultDocumentList.add(document) i += 1 if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { - arangoManger.overwrite(resultDocumentList, "FQDN") - LOG.warn("更新FQDN:" + i) + arangoManger.overwrite(resultDocumentList, collName) + LOG.warn(s"更新:$collName" + i) i = 0 } }) if (i != 0) { - arangoManger.overwrite(resultDocumentList, "FQDN") - LOG.warn("更新FQDN:" + i) + arangoManger.overwrite(resultDocumentList, collName) + LOG.warn(s"更新$collName:" + i) } }) val last = System.currentTimeMillis() - LOG.warn(s"更新FQDN时间:${last - start}") + LOG.warn(s"更新$collName 时间:${last - start}") } catch { case e: Exception => e.printStackTrace() } finally { - hisVerFqdnBc.destroy() + hisBc.destroy() } } - def updateVertexIp(): Unit = { - baseArangoData.readHistoryData("IP", historyVertexIpMap, classOf[BaseDocument]) - val hisVerIpBc = spark.sparkContext.broadcast(historyVertexIpMap) - try { - val start = System.currentTimeMillis() - val mergeVertexIpDf = mergeVertexIp() - mergeVertexIpDf.foreachPartition(iter => { - val partitionId: Int = TaskContext.get.partitionId - val hisVerIpMapTmp = hisVerIpBc.value.get(partitionId) - val resultDocumentList: util.ArrayList[BaseDocument] = new util.ArrayList[BaseDocument] - var i = 0 - iter.foreach(row => { - val ip = row.getAs[String]("IP") - val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") - val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") - val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST") - val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST") - val ipTypeList = row.getAs[ofRef[String]]("ip_type_list") - val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList) - - var document = hisVerIpMapTmp.getOrDefault(ip, null) - if (document != null) { - updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") - updateSumAttribute(document, sepAttributeTuple._1, "SERVER_SESSION_COUNT") - updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM") - updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT") - updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM") - } else { - document = new BaseDocument - document.setKey(ip) - document.addAttribute("IP", ip) - document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) - document.addAttribute("LAST_FOUND_TIME", lastFoundTime) - document.addAttribute("SERVER_SESSION_COUNT", sepAttributeTuple._1) - document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2) - document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3) - document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4) - document.addAttribute("COMMON_LINK_INFO", "") - } - resultDocumentList.add(document) - i += 1 - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { - arangoManger.overwrite(resultDocumentList, "IP") - LOG.warn("更新IP:" + i) - i = 0 - } - }) - if (i != 0) { - arangoManger.overwrite(resultDocumentList, "IP") - LOG.warn("更新IP:" + i) - } - }) - val last = System.currentTimeMillis() - LOG.warn(s"更新IP时间:${last - start}") - } catch { - case e: Exception => e.printStackTrace() - } finally { - hisVerIpBc.destroy() + private def getVertexFqdnRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = { + val fqdn = row.getAs[String]("FQDN") + val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") + val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") + var document: BaseDocument = dictionaryMap.getOrDefault(fqdn, null) + if (document != null) { + updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") + } else { + document = new BaseDocument + document.setKey(fqdn) + document.addAttribute("FQDN_NAME", fqdn) + document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) + document.addAttribute("LAST_FOUND_TIME", lastFoundTime) } + document } - def updateRelationFqdnLocateIp(): Unit = { - baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument]) - val hisReFqdnLocIpBc = spark.sparkContext.broadcast(historyRelationFqdnAddressIpMap) - try { - val start = System.currentTimeMillis() - val mergeRelationFqdnLocateIpDf = mergeRelationFqdnLocateIp() - mergeRelationFqdnLocateIpDf.foreachPartition(iter => { - val partitionId: Int = TaskContext.get.partitionId - val hisRelaFqdnLocaIpMapTmp = hisReFqdnLocIpBc.value.get(partitionId) - val resultDocumentList: util.ArrayList[BaseEdgeDocument] = new util.ArrayList[BaseEdgeDocument] - var i = 0 - iter.foreach(row => { - val fqdn = row.getAs[String]("FQDN") - val serverIp = row.getAs[String]("common_server_ip") - val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") - val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") - val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") - val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") - val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") + private def getVertexIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = { + val ip = row.getAs[String]("IP") + val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") + val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") + val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST") + val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST") + val ipTypeList = row.getAs[ofRef[String]]("ip_type_list") + val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList) - val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) - val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) - - val key = fqdn.concat("-" + serverIp) - var document: BaseEdgeDocument = hisRelaFqdnLocaIpMapTmp.getOrDefault(key, null) - if (document != null) { - updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") - updateProtocolAttritube(document, sepAttritubeMap) - updateDistinctIp(document, distinctIp) - } else { - document = new BaseEdgeDocument() - document.setKey(key) - document.setFrom("FQDN/" + fqdn) - document.setTo("IP/" + serverIp) - document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) - document.addAttribute("LAST_FOUND_TIME", lastFoundTime) - putProtocolAttritube(document, sepAttritubeMap) - putDistinctIp(document, distinctIp) - } - resultDocumentList.add(document) - i += 1 - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { - arangoManger.overwrite(resultDocumentList, "R_LOCATE_FQDN2IP") - LOG.warn("更新R_LOCATE_FQDN2IP:" + i) - i = 0 - } - }) - if (i != 0) { - arangoManger.overwrite(resultDocumentList, "R_LOCATE_FQDN2IP") - LOG.warn("更新R_LOCATE_FQDN2IP:" + i) - } - }) - val last = System.currentTimeMillis() - LOG.warn(s"更新R_LOCATE_FQDN2IP时间:${last - start}") - } catch { - case e: Exception => e.printStackTrace() - } finally { - hisReFqdnLocIpBc.destroy() + var document = dictionaryMap.getOrDefault(ip, null) + if (document != null) { + updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") + updateSumAttribute(document, sepAttributeTuple._1, "SERVER_SESSION_COUNT") + updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM") + updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT") + updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM") + } else { + document = new BaseDocument + document.setKey(ip) + document.addAttribute("IP", ip) + document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) + document.addAttribute("LAST_FOUND_TIME", lastFoundTime) + document.addAttribute("SERVER_SESSION_COUNT", sepAttributeTuple._1) + document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2) + document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3) + document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4) + document.addAttribute("COMMON_LINK_INFO", "") } + document + } + + private def getRelationFqdnLocateIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseEdgeDocument]): BaseEdgeDocument = { + val fqdn = row.getAs[String]("FQDN") + val serverIp = row.getAs[String]("common_server_ip") + val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") + val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") + val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") + val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") + val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") + + val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) + val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) + + val key = fqdn.concat("-" + serverIp) + var document = dictionaryMap.getOrDefault(key, null) + if (document != null) { + updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") + updateProtocolAttritube(document, sepAttritubeMap) + updateDistinctIp(document, distinctIp) + } else { + document = new BaseEdgeDocument() + document.setKey(key) + document.setFrom("FQDN/" + fqdn) + document.setTo("IP/" + serverIp) + document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) + document.addAttribute("LAST_FOUND_TIME", lastFoundTime) + putProtocolAttritube(document, sepAttritubeMap) + putDistinctIp(document, distinctIp) + } + document } }