diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index 038e301..b417624 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -1,5 +1,7 @@ package cn.ac.iie.service.update + import java.util +import java.util.concurrent.ConcurrentHashMap import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.dao.BaseArangoData @@ -10,6 +12,7 @@ import cn.ac.iie.utils.ArangoDBConnect import cn.ac.iie.utils.SparkSessionUtil.spark import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} import org.apache.spark.TaskContext +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.slf4j.LoggerFactory @@ -22,12 +25,20 @@ object UpdateDocument { private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass) private val baseArangoData = new BaseArangoData() - def updateDocument(): Unit ={ + def updateDocument[T <: BaseDocument](collName: String, + historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]], + getDocumentRow: Row => T, + getNewDataRdd: Unit => RDD[Row] + ): Unit = { + baseArangoData.readHistoryData(collName, historyMap, classOf[T]) + val hisBc = spark.sparkContext.broadcast(historyMap) + try { + } } - def updateVertexFqdn(): Unit ={ - baseArangoData.readHistoryData("FQDN",historyVertexFqdnMap,classOf[BaseDocument]) + def updateVertexFqdn(): Unit = { + baseArangoData.readHistoryData("FQDN", historyVertexFqdnMap, classOf[BaseDocument]) val hisVerFqdnBc = spark.sparkContext.broadcast(historyVertexFqdnMap) try { val start = System.currentTimeMillis() @@ -41,19 +52,19 @@ object UpdateDocument { val fqdn = row.getAs[String]("FQDN") val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") - var document: BaseDocument = hisVerFqdnMapTmp.getOrDefault(fqdn,null) - if (document != null){ - updateMaxAttribute(document,lastFoundTime,"LAST_FOUND_TIME") - } else{ + var document: BaseDocument = hisVerFqdnMapTmp.getOrDefault(fqdn, null) + if (document != null) { + updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") + } else { document = new BaseDocument document.setKey(fqdn) - document.addAttribute("FQDN_NAME",fqdn) - document.addAttribute("FIRST_FOUND_TIME",firstFoundTime) - document.addAttribute("LAST_FOUND_TIME",lastFoundTime) + document.addAttribute("FQDN_NAME", fqdn) + document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) + document.addAttribute("LAST_FOUND_TIME", lastFoundTime) } resultDocumentList.add(document) - i+=1 - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + i += 1 + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { arangoManger.overwrite(resultDocumentList, "FQDN") LOG.warn("更新FQDN:" + i) i = 0 @@ -65,16 +76,16 @@ object UpdateDocument { } }) val last = System.currentTimeMillis() - LOG.warn(s"更新FQDN时间:${last-start}") - }catch { - case e:Exception => e.printStackTrace() - }finally { + LOG.warn(s"更新FQDN时间:${last - start}") + } catch { + case e: Exception => e.printStackTrace() + } finally { hisVerFqdnBc.destroy() } } - def updateVertexIp(): Unit ={ - baseArangoData.readHistoryData("IP",historyVertexIpMap,classOf[BaseDocument]) + def updateVertexIp(): Unit = { + baseArangoData.readHistoryData("IP", historyVertexIpMap, classOf[BaseDocument]) val hisVerIpBc = spark.sparkContext.broadcast(historyVertexIpMap) try { val start = System.currentTimeMillis() @@ -91,30 +102,30 @@ object UpdateDocument { val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST") val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST") val ipTypeList = row.getAs[ofRef[String]]("ip_type_list") - val sepAttributeTuple = separateAttributeByIpType(ipTypeList,sessionCountList,bytesSumList) + val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList) - var document = hisVerIpMapTmp.getOrDefault(ip,null) - if (document != null){ - updateMaxAttribute(document,lastFoundTime,"LAST_FOUND_TIME") - updateSumAttribute(document,sepAttributeTuple._1,"SERVER_SESSION_COUNT") - updateSumAttribute(document,sepAttributeTuple._2,"SERVER_BYTES_SUM") - updateSumAttribute(document,sepAttributeTuple._3,"CLIENT_SESSION_COUNT") - updateSumAttribute(document,sepAttributeTuple._4,"CLIENT_BYTES_SUM") + var document = hisVerIpMapTmp.getOrDefault(ip, null) + if (document != null) { + updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") + updateSumAttribute(document, sepAttributeTuple._1, "SERVER_SESSION_COUNT") + updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM") + updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT") + updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM") } else { document = new BaseDocument document.setKey(ip) - document.addAttribute("IP",ip) - document.addAttribute("FIRST_FOUND_TIME",firstFoundTime) - document.addAttribute("LAST_FOUND_TIME",lastFoundTime) - document.addAttribute("SERVER_SESSION_COUNT",sepAttributeTuple._1) - document.addAttribute("SERVER_BYTES_SUM",sepAttributeTuple._2) - document.addAttribute("CLIENT_SESSION_COUNT",sepAttributeTuple._3) - document.addAttribute("CLIENT_BYTES_SUM",sepAttributeTuple._4) - document.addAttribute("COMMON_LINK_INFO","") + document.addAttribute("IP", ip) + document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) + document.addAttribute("LAST_FOUND_TIME", lastFoundTime) + document.addAttribute("SERVER_SESSION_COUNT", sepAttributeTuple._1) + document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2) + document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3) + document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4) + document.addAttribute("COMMON_LINK_INFO", "") } resultDocumentList.add(document) - i+=1 - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + i += 1 + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { arangoManger.overwrite(resultDocumentList, "IP") LOG.warn("更新IP:" + i) i = 0 @@ -126,15 +137,15 @@ object UpdateDocument { } }) val last = System.currentTimeMillis() - LOG.warn(s"更新IP时间:${last-start}") - }catch { - case e:Exception => e.printStackTrace() - }finally { + LOG.warn(s"更新IP时间:${last - start}") + } catch { + case e: Exception => e.printStackTrace() + } finally { hisVerIpBc.destroy() } } - def updateRelationFqdnLocateIp(): Unit ={ + def updateRelationFqdnLocateIp(): Unit = { baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument]) val hisReFqdnLocIpBc = spark.sparkContext.broadcast(historyRelationFqdnAddressIpMap) try { @@ -154,28 +165,28 @@ object UpdateDocument { val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") - val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList,countTotalList) + val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) - val key = fqdn.concat("-"+serverIp) - var document: BaseEdgeDocument = hisRelaFqdnLocaIpMapTmp.getOrDefault(key,null) - if (document != null){ - updateMaxAttribute(document,lastFoundTime,"LAST_FOUND_TIME") - updateProtocolAttritube(document,sepAttritubeMap) - updateDistinctIp(document,distinctIp) - }else { + val key = fqdn.concat("-" + serverIp) + var document: BaseEdgeDocument = hisRelaFqdnLocaIpMapTmp.getOrDefault(key, null) + if (document != null) { + updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") + updateProtocolAttritube(document, sepAttritubeMap) + updateDistinctIp(document, distinctIp) + } else { document = new BaseEdgeDocument() document.setKey(key) document.setFrom("FQDN/" + fqdn) document.setTo("IP/" + serverIp) document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) document.addAttribute("LAST_FOUND_TIME", lastFoundTime) - putProtocolAttritube(document,sepAttritubeMap) - putDistinctIp(document,distinctIp) + putProtocolAttritube(document, sepAttritubeMap) + putDistinctIp(document, distinctIp) } resultDocumentList.add(document) - i+=1 - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ + i += 1 + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { arangoManger.overwrite(resultDocumentList, "R_LOCATE_FQDN2IP") LOG.warn("更新R_LOCATE_FQDN2IP:" + i) i = 0 @@ -187,10 +198,10 @@ object UpdateDocument { } }) val last = System.currentTimeMillis() - LOG.warn(s"更新R_LOCATE_FQDN2IP时间:${last-start}") - }catch { - case e:Exception => e.printStackTrace() - }finally { + LOG.warn(s"更新R_LOCATE_FQDN2IP时间:${last - start}") + } catch { + case e: Exception => e.printStackTrace() + } finally { hisReFqdnLocIpBc.destroy() } }