抽象公共方法

This commit is contained in:
wanglihui
2020-08-10 18:38:15 +08:00
parent e946b506d3
commit 2592b5b8aa
3 changed files with 110 additions and 171 deletions

View File

@@ -42,4 +42,4 @@ update.arango.batch=10000
distinct.client.ip.num=10000 distinct.client.ip.num=10000
recent.count.hour=24 recent.count.hour=24
update.interval=3600 update.interval=10800

View File

@@ -1,22 +1,10 @@
package cn.ac.iie.main package cn.ac.iie.main
import cn.ac.iie.service.update.UpdateDocument._ import cn.ac.iie.service.update.UpdateDocument
import cn.ac.iie.utils.{ExecutorThreadPool, SparkSessionUtil}
object IpLearningApplication { object IpLearningApplication {
private val pool = ExecutorThreadPool.getInstance
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
try { UpdateDocument.update()
updateVertexFqdn()
updateVertexIp()
updateRelationFqdnLocateIp()
}catch {
case e:Exception => e.printStackTrace()
}finally {
pool.shutdown()
arangoManger.clean()
SparkSessionUtil.closeSpark()
}
} }
} }

View File

@@ -8,11 +8,10 @@ import cn.ac.iie.dao.BaseArangoData
import cn.ac.iie.dao.BaseArangoData._ import cn.ac.iie.dao.BaseArangoData._
import cn.ac.iie.service.transform.MergeDataFrame._ import cn.ac.iie.service.transform.MergeDataFrame._
import cn.ac.iie.service.update.UpdateDocHandler._ import cn.ac.iie.service.update.UpdateDocHandler._
import cn.ac.iie.utils.ArangoDBConnect import cn.ac.iie.utils.{ArangoDBConnect, ExecutorThreadPool, SparkSessionUtil}
import cn.ac.iie.utils.SparkSessionUtil.spark import cn.ac.iie.utils.SparkSessionUtil.spark
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import org.apache.spark.TaskContext import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row import org.apache.spark.sql.Row
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@@ -20,190 +19,142 @@ import org.slf4j.LoggerFactory
import scala.collection.mutable.WrappedArray.ofRef import scala.collection.mutable.WrappedArray.ofRef
object UpdateDocument { object UpdateDocument {
private val pool = ExecutorThreadPool.getInstance
val arangoManger: ArangoDBConnect = ArangoDBConnect.getInstance() private val arangoManger: ArangoDBConnect = ArangoDBConnect.getInstance()
private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass) private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass)
private val baseArangoData = new BaseArangoData() private val baseArangoData = new BaseArangoData()
def updateDocument[T <: BaseDocument](collName: String, def update(): Unit = {
historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]], try {
getDocumentRow: Row => T, updateDocument("FQDN", historyVertexFqdnMap, getVertexFqdnRow, classOf[BaseDocument], mergeVertexFqdn)
getNewDataRdd: Unit => RDD[Row] updateDocument("IP", historyVertexIpMap, getVertexIpRow, classOf[BaseDocument], mergeVertexIp)
): Unit = { updateDocument("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp)
baseArangoData.readHistoryData(collName, historyMap, classOf[T]) } catch {
case e: Exception => e.printStackTrace()
} finally {
pool.shutdown()
arangoManger.clean()
SparkSessionUtil.closeSpark()
}
}
private def updateDocument[T <: BaseDocument](collName: String,
historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]],
getDocumentRow: (Row, ConcurrentHashMap[String, T]) => T,
clazz: Class[T],
getNewDataRdd: () => RDD[Row]
): Unit = {
baseArangoData.readHistoryData(collName, historyMap, clazz)
val hisBc = spark.sparkContext.broadcast(historyMap) val hisBc = spark.sparkContext.broadcast(historyMap)
try {
}
}
def updateVertexFqdn(): Unit = {
baseArangoData.readHistoryData("FQDN", historyVertexFqdnMap, classOf[BaseDocument])
val hisVerFqdnBc = spark.sparkContext.broadcast(historyVertexFqdnMap)
try { try {
val start = System.currentTimeMillis() val start = System.currentTimeMillis()
val mergeVertexFqdnDf: RDD[Row] = mergeVertexFqdn() val newDataRdd = getNewDataRdd()
mergeVertexFqdnDf.foreachPartition(iter => { newDataRdd.foreachPartition(iter => {
val partitionId: Int = TaskContext.get.partitionId val partitionId: Int = TaskContext.get.partitionId
val hisVerFqdnMapTmp = hisVerFqdnBc.value.get(partitionId) val dictionaryMap: ConcurrentHashMap[String, T] = hisBc.value.get(partitionId)
val resultDocumentList: util.ArrayList[BaseDocument] = new util.ArrayList[BaseDocument] val resultDocumentList = new util.ArrayList[T]
var i = 0 var i = 0
iter.foreach(row => { iter.foreach(row => {
val fqdn = row.getAs[String]("FQDN") val document = getDocumentRow(row, dictionaryMap)
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
var document: BaseDocument = hisVerFqdnMapTmp.getOrDefault(fqdn, null)
if (document != null) {
updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
} else {
document = new BaseDocument
document.setKey(fqdn)
document.addAttribute("FQDN_NAME", fqdn)
document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
}
resultDocumentList.add(document) resultDocumentList.add(document)
i += 1 i += 1
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
arangoManger.overwrite(resultDocumentList, "FQDN") arangoManger.overwrite(resultDocumentList, collName)
LOG.warn("更新FQDN:" + i) LOG.warn(s"更新:$collName" + i)
i = 0 i = 0
} }
}) })
if (i != 0) { if (i != 0) {
arangoManger.overwrite(resultDocumentList, "FQDN") arangoManger.overwrite(resultDocumentList, collName)
LOG.warn("更新FQDN:" + i) LOG.warn(s"更新$collName:" + i)
} }
}) })
val last = System.currentTimeMillis() val last = System.currentTimeMillis()
LOG.warn(s"更新FQDN时间:${last - start}") LOG.warn(s"更新$collName 时间:${last - start}")
} catch { } catch {
case e: Exception => e.printStackTrace() case e: Exception => e.printStackTrace()
} finally { } finally {
hisVerFqdnBc.destroy() hisBc.destroy()
} }
} }
def updateVertexIp(): Unit = { private def getVertexFqdnRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = {
baseArangoData.readHistoryData("IP", historyVertexIpMap, classOf[BaseDocument]) val fqdn = row.getAs[String]("FQDN")
val hisVerIpBc = spark.sparkContext.broadcast(historyVertexIpMap) val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
try { val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
val start = System.currentTimeMillis() var document: BaseDocument = dictionaryMap.getOrDefault(fqdn, null)
val mergeVertexIpDf = mergeVertexIp() if (document != null) {
mergeVertexIpDf.foreachPartition(iter => { updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
val partitionId: Int = TaskContext.get.partitionId } else {
val hisVerIpMapTmp = hisVerIpBc.value.get(partitionId) document = new BaseDocument
val resultDocumentList: util.ArrayList[BaseDocument] = new util.ArrayList[BaseDocument] document.setKey(fqdn)
var i = 0 document.addAttribute("FQDN_NAME", fqdn)
iter.foreach(row => { document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
val ip = row.getAs[String]("IP") document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST")
val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST")
val ipTypeList = row.getAs[ofRef[String]]("ip_type_list")
val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList)
var document = hisVerIpMapTmp.getOrDefault(ip, null)
if (document != null) {
updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
updateSumAttribute(document, sepAttributeTuple._1, "SERVER_SESSION_COUNT")
updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM")
updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT")
updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM")
} else {
document = new BaseDocument
document.setKey(ip)
document.addAttribute("IP", ip)
document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
document.addAttribute("SERVER_SESSION_COUNT", sepAttributeTuple._1)
document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2)
document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3)
document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4)
document.addAttribute("COMMON_LINK_INFO", "")
}
resultDocumentList.add(document)
i += 1
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
arangoManger.overwrite(resultDocumentList, "IP")
LOG.warn("更新IP:" + i)
i = 0
}
})
if (i != 0) {
arangoManger.overwrite(resultDocumentList, "IP")
LOG.warn("更新IP:" + i)
}
})
val last = System.currentTimeMillis()
LOG.warn(s"更新IP时间${last - start}")
} catch {
case e: Exception => e.printStackTrace()
} finally {
hisVerIpBc.destroy()
} }
document
} }
def updateRelationFqdnLocateIp(): Unit = { private def getVertexIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = {
baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument]) val ip = row.getAs[String]("IP")
val hisReFqdnLocIpBc = spark.sparkContext.broadcast(historyRelationFqdnAddressIpMap) val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
try { val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
val start = System.currentTimeMillis() val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST")
val mergeRelationFqdnLocateIpDf = mergeRelationFqdnLocateIp() val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST")
mergeRelationFqdnLocateIpDf.foreachPartition(iter => { val ipTypeList = row.getAs[ofRef[String]]("ip_type_list")
val partitionId: Int = TaskContext.get.partitionId val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList)
val hisRelaFqdnLocaIpMapTmp = hisReFqdnLocIpBc.value.get(partitionId)
val resultDocumentList: util.ArrayList[BaseEdgeDocument] = new util.ArrayList[BaseEdgeDocument]
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("FQDN")
val serverIp = row.getAs[String]("common_server_ip")
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) var document = dictionaryMap.getOrDefault(ip, null)
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) if (document != null) {
updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
val key = fqdn.concat("-" + serverIp) updateSumAttribute(document, sepAttributeTuple._1, "SERVER_SESSION_COUNT")
var document: BaseEdgeDocument = hisRelaFqdnLocaIpMapTmp.getOrDefault(key, null) updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM")
if (document != null) { updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT")
updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM")
updateProtocolAttritube(document, sepAttritubeMap) } else {
updateDistinctIp(document, distinctIp) document = new BaseDocument
} else { document.setKey(ip)
document = new BaseEdgeDocument() document.addAttribute("IP", ip)
document.setKey(key) document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
document.setFrom("FQDN/" + fqdn) document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
document.setTo("IP/" + serverIp) document.addAttribute("SERVER_SESSION_COUNT", sepAttributeTuple._1)
document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2)
document.addAttribute("LAST_FOUND_TIME", lastFoundTime) document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3)
putProtocolAttritube(document, sepAttritubeMap) document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4)
putDistinctIp(document, distinctIp) document.addAttribute("COMMON_LINK_INFO", "")
}
resultDocumentList.add(document)
i += 1
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
arangoManger.overwrite(resultDocumentList, "R_LOCATE_FQDN2IP")
LOG.warn("更新R_LOCATE_FQDN2IP:" + i)
i = 0
}
})
if (i != 0) {
arangoManger.overwrite(resultDocumentList, "R_LOCATE_FQDN2IP")
LOG.warn("更新R_LOCATE_FQDN2IP:" + i)
}
})
val last = System.currentTimeMillis()
LOG.warn(s"更新R_LOCATE_FQDN2IP时间${last - start}")
} catch {
case e: Exception => e.printStackTrace()
} finally {
hisReFqdnLocIpBc.destroy()
} }
document
}
private def getRelationFqdnLocateIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseEdgeDocument]): BaseEdgeDocument = {
val fqdn = row.getAs[String]("FQDN")
val serverIp = row.getAs[String]("common_server_ip")
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
val key = fqdn.concat("-" + serverIp)
var document = dictionaryMap.getOrDefault(key, null)
if (document != null) {
updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
updateProtocolAttritube(document, sepAttritubeMap)
updateDistinctIp(document, distinctIp)
} else {
document = new BaseEdgeDocument()
document.setKey(key)
document.setFrom("FQDN/" + fqdn)
document.setTo("IP/" + serverIp)
document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
putProtocolAttritube(document, sepAttritubeMap)
putDistinctIp(document, distinctIp)
}
document
} }
} }