IP Learning tsg项目 spark版本首次提交

This commit is contained in:
wanglihui
2020-08-07 10:39:22 +08:00
parent 6ea2518ee4
commit 2a4d6dda4a

View File

@@ -1,5 +1,7 @@
package cn.ac.iie.service.update package cn.ac.iie.service.update
import java.util import java.util
import java.util.concurrent.ConcurrentHashMap
import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.config.ApplicationConfig
import cn.ac.iie.dao.BaseArangoData import cn.ac.iie.dao.BaseArangoData
@@ -10,6 +12,7 @@ import cn.ac.iie.utils.ArangoDBConnect
import cn.ac.iie.utils.SparkSessionUtil.spark import cn.ac.iie.utils.SparkSessionUtil.spark
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import org.apache.spark.TaskContext import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row import org.apache.spark.sql.Row
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@@ -22,12 +25,20 @@ object UpdateDocument {
private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass) private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass)
private val baseArangoData = new BaseArangoData() private val baseArangoData = new BaseArangoData()
def updateDocument(): Unit ={ def updateDocument[T <: BaseDocument](collName: String,
historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]],
getDocumentRow: Row => T,
getNewDataRdd: Unit => RDD[Row]
): Unit = {
baseArangoData.readHistoryData(collName, historyMap, classOf[T])
val hisBc = spark.sparkContext.broadcast(historyMap)
try {
}
} }
def updateVertexFqdn(): Unit ={ def updateVertexFqdn(): Unit = {
baseArangoData.readHistoryData("FQDN",historyVertexFqdnMap,classOf[BaseDocument]) baseArangoData.readHistoryData("FQDN", historyVertexFqdnMap, classOf[BaseDocument])
val hisVerFqdnBc = spark.sparkContext.broadcast(historyVertexFqdnMap) val hisVerFqdnBc = spark.sparkContext.broadcast(historyVertexFqdnMap)
try { try {
val start = System.currentTimeMillis() val start = System.currentTimeMillis()
@@ -41,19 +52,19 @@ object UpdateDocument {
val fqdn = row.getAs[String]("FQDN") val fqdn = row.getAs[String]("FQDN")
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
var document: BaseDocument = hisVerFqdnMapTmp.getOrDefault(fqdn,null) var document: BaseDocument = hisVerFqdnMapTmp.getOrDefault(fqdn, null)
if (document != null){ if (document != null) {
updateMaxAttribute(document,lastFoundTime,"LAST_FOUND_TIME") updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
} else{ } else {
document = new BaseDocument document = new BaseDocument
document.setKey(fqdn) document.setKey(fqdn)
document.addAttribute("FQDN_NAME",fqdn) document.addAttribute("FQDN_NAME", fqdn)
document.addAttribute("FIRST_FOUND_TIME",firstFoundTime) document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
document.addAttribute("LAST_FOUND_TIME",lastFoundTime) document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
} }
resultDocumentList.add(document) resultDocumentList.add(document)
i+=1 i += 1
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
arangoManger.overwrite(resultDocumentList, "FQDN") arangoManger.overwrite(resultDocumentList, "FQDN")
LOG.warn("更新FQDN:" + i) LOG.warn("更新FQDN:" + i)
i = 0 i = 0
@@ -65,16 +76,16 @@ object UpdateDocument {
} }
}) })
val last = System.currentTimeMillis() val last = System.currentTimeMillis()
LOG.warn(s"更新FQDN时间${last-start}") LOG.warn(s"更新FQDN时间${last - start}")
}catch { } catch {
case e:Exception => e.printStackTrace() case e: Exception => e.printStackTrace()
}finally { } finally {
hisVerFqdnBc.destroy() hisVerFqdnBc.destroy()
} }
} }
def updateVertexIp(): Unit ={ def updateVertexIp(): Unit = {
baseArangoData.readHistoryData("IP",historyVertexIpMap,classOf[BaseDocument]) baseArangoData.readHistoryData("IP", historyVertexIpMap, classOf[BaseDocument])
val hisVerIpBc = spark.sparkContext.broadcast(historyVertexIpMap) val hisVerIpBc = spark.sparkContext.broadcast(historyVertexIpMap)
try { try {
val start = System.currentTimeMillis() val start = System.currentTimeMillis()
@@ -91,30 +102,30 @@ object UpdateDocument {
val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST") val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST")
val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST") val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST")
val ipTypeList = row.getAs[ofRef[String]]("ip_type_list") val ipTypeList = row.getAs[ofRef[String]]("ip_type_list")
val sepAttributeTuple = separateAttributeByIpType(ipTypeList,sessionCountList,bytesSumList) val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList)
var document = hisVerIpMapTmp.getOrDefault(ip,null) var document = hisVerIpMapTmp.getOrDefault(ip, null)
if (document != null){ if (document != null) {
updateMaxAttribute(document,lastFoundTime,"LAST_FOUND_TIME") updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
updateSumAttribute(document,sepAttributeTuple._1,"SERVER_SESSION_COUNT") updateSumAttribute(document, sepAttributeTuple._1, "SERVER_SESSION_COUNT")
updateSumAttribute(document,sepAttributeTuple._2,"SERVER_BYTES_SUM") updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM")
updateSumAttribute(document,sepAttributeTuple._3,"CLIENT_SESSION_COUNT") updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT")
updateSumAttribute(document,sepAttributeTuple._4,"CLIENT_BYTES_SUM") updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM")
} else { } else {
document = new BaseDocument document = new BaseDocument
document.setKey(ip) document.setKey(ip)
document.addAttribute("IP",ip) document.addAttribute("IP", ip)
document.addAttribute("FIRST_FOUND_TIME",firstFoundTime) document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
document.addAttribute("LAST_FOUND_TIME",lastFoundTime) document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
document.addAttribute("SERVER_SESSION_COUNT",sepAttributeTuple._1) document.addAttribute("SERVER_SESSION_COUNT", sepAttributeTuple._1)
document.addAttribute("SERVER_BYTES_SUM",sepAttributeTuple._2) document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2)
document.addAttribute("CLIENT_SESSION_COUNT",sepAttributeTuple._3) document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3)
document.addAttribute("CLIENT_BYTES_SUM",sepAttributeTuple._4) document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4)
document.addAttribute("COMMON_LINK_INFO","") document.addAttribute("COMMON_LINK_INFO", "")
} }
resultDocumentList.add(document) resultDocumentList.add(document)
i+=1 i += 1
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
arangoManger.overwrite(resultDocumentList, "IP") arangoManger.overwrite(resultDocumentList, "IP")
LOG.warn("更新IP:" + i) LOG.warn("更新IP:" + i)
i = 0 i = 0
@@ -126,15 +137,15 @@ object UpdateDocument {
} }
}) })
val last = System.currentTimeMillis() val last = System.currentTimeMillis()
LOG.warn(s"更新IP时间${last-start}") LOG.warn(s"更新IP时间${last - start}")
}catch { } catch {
case e:Exception => e.printStackTrace() case e: Exception => e.printStackTrace()
}finally { } finally {
hisVerIpBc.destroy() hisVerIpBc.destroy()
} }
} }
def updateRelationFqdnLocateIp(): Unit ={ def updateRelationFqdnLocateIp(): Unit = {
baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument]) baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument])
val hisReFqdnLocIpBc = spark.sparkContext.broadcast(historyRelationFqdnAddressIpMap) val hisReFqdnLocIpBc = spark.sparkContext.broadcast(historyRelationFqdnAddressIpMap)
try { try {
@@ -154,28 +165,28 @@ object UpdateDocument {
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList,countTotalList) val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
val key = fqdn.concat("-"+serverIp) val key = fqdn.concat("-" + serverIp)
var document: BaseEdgeDocument = hisRelaFqdnLocaIpMapTmp.getOrDefault(key,null) var document: BaseEdgeDocument = hisRelaFqdnLocaIpMapTmp.getOrDefault(key, null)
if (document != null){ if (document != null) {
updateMaxAttribute(document,lastFoundTime,"LAST_FOUND_TIME") updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
updateProtocolAttritube(document,sepAttritubeMap) updateProtocolAttritube(document, sepAttritubeMap)
updateDistinctIp(document,distinctIp) updateDistinctIp(document, distinctIp)
}else { } else {
document = new BaseEdgeDocument() document = new BaseEdgeDocument()
document.setKey(key) document.setKey(key)
document.setFrom("FQDN/" + fqdn) document.setFrom("FQDN/" + fqdn)
document.setTo("IP/" + serverIp) document.setTo("IP/" + serverIp)
document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
document.addAttribute("LAST_FOUND_TIME", lastFoundTime) document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
putProtocolAttritube(document,sepAttritubeMap) putProtocolAttritube(document, sepAttritubeMap)
putDistinctIp(document,distinctIp) putDistinctIp(document, distinctIp)
} }
resultDocumentList.add(document) resultDocumentList.add(document)
i+=1 i += 1
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
arangoManger.overwrite(resultDocumentList, "R_LOCATE_FQDN2IP") arangoManger.overwrite(resultDocumentList, "R_LOCATE_FQDN2IP")
LOG.warn("更新R_LOCATE_FQDN2IP:" + i) LOG.warn("更新R_LOCATE_FQDN2IP:" + i)
i = 0 i = 0
@@ -187,10 +198,10 @@ object UpdateDocument {
} }
}) })
val last = System.currentTimeMillis() val last = System.currentTimeMillis()
LOG.warn(s"更新R_LOCATE_FQDN2IP时间${last-start}") LOG.warn(s"更新R_LOCATE_FQDN2IP时间${last - start}")
}catch { } catch {
case e:Exception => e.printStackTrace() case e: Exception => e.printStackTrace()
}finally { } finally {
hisReFqdnLocIpBc.destroy() hisReFqdnLocIpBc.destroy()
} }
} }