report 版本
This commit is contained in:
@@ -27,9 +27,9 @@ arangoDB.ttl=3600
|
|||||||
thread.pool.number=5
|
thread.pool.number=5
|
||||||
|
|
||||||
#读取clickhouse时间范围方式,0:读取过去一小时;1:指定时间范围
|
#读取clickhouse时间范围方式,0:读取过去一小时;1:指定时间范围
|
||||||
clickhouse.time.limit.type=0
|
clickhouse.time.limit.type=1
|
||||||
read.clickhouse.max.time=1571245220
|
read.clickhouse.max.time=1598246519
|
||||||
read.clickhouse.min.time=1571245210
|
read.clickhouse.min.time=1597161600
|
||||||
|
|
||||||
#读取arangoDB时间范围方式,0:正常读;1:指定时间范围
|
#读取arangoDB时间范围方式,0:正常读;1:指定时间范围
|
||||||
arango.time.limit.type=0
|
arango.time.limit.type=0
|
||||||
|
|||||||
@@ -4,16 +4,21 @@ import java.util.regex.Pattern
|
|||||||
|
|
||||||
import cn.ac.iie.config.ApplicationConfig
|
import cn.ac.iie.config.ApplicationConfig
|
||||||
import cn.ac.iie.dao.BaseClickhouseData
|
import cn.ac.iie.dao.BaseClickhouseData
|
||||||
|
import cn.ac.iie.service.update.UpdateDocHandler.{mergeDistinctIp, separateAttributeByProtocol}
|
||||||
import cn.ac.iie.spark.partition.CustomPartitioner
|
import cn.ac.iie.spark.partition.CustomPartitioner
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.Row
|
import org.apache.spark.sql.Row
|
||||||
import org.apache.spark.sql.functions._
|
import org.apache.spark.sql.functions._
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
import scala.collection.mutable.WrappedArray.ofRef
|
||||||
|
|
||||||
object MergeDataFrame {
|
object MergeDataFrame {
|
||||||
private val LOG = LoggerFactory.getLogger(MergeDataFrame.getClass)
|
private val LOG = LoggerFactory.getLogger(MergeDataFrame.getClass)
|
||||||
private val pattern = Pattern.compile("^[\\d]*$")
|
private val pattern = Pattern.compile("^[\\d]*$")
|
||||||
|
|
||||||
|
private val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60
|
||||||
|
|
||||||
def mergeVertexFqdn(): RDD[Row] ={
|
def mergeVertexFqdn(): RDD[Row] ={
|
||||||
BaseClickhouseData.getVertexFqdnDf
|
BaseClickhouseData.getVertexFqdnDf
|
||||||
.rdd.filter(row => isDomain(row.getAs[String](0))).map(row => (row.get(0),row))
|
.rdd.filter(row => isDomain(row.getAs[String](0))).map(row => (row.get(0),row))
|
||||||
@@ -47,9 +52,17 @@ object MergeDataFrame {
|
|||||||
frame.rdd.map(row => {
|
frame.rdd.map(row => {
|
||||||
val fqdn = row.getAs[String]("FQDN")
|
val fqdn = row.getAs[String]("FQDN")
|
||||||
val serverIp = row.getAs[String]("common_server_ip")
|
val serverIp = row.getAs[String]("common_server_ip")
|
||||||
val key = fqdn.concat("-"+serverIp)
|
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
|
||||||
(key,row)
|
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
|
||||||
}).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values
|
val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
|
||||||
|
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
|
||||||
|
val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
|
||||||
|
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
|
||||||
|
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
|
||||||
|
|
||||||
|
Row(fqdn,serverIp,firstFoundTime,lastFoundTime,
|
||||||
|
sepAttritubeMap.get("HTTP"),sepAttritubeMap.get("TLS"),sepAttritubeMap.get("DNS"),distinctIp,currentHour)
|
||||||
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,80 +1,85 @@
|
|||||||
package cn.ac.iie.service.update
|
package cn.ac.iie.service.update
|
||||||
|
|
||||||
import java.util
|
import java.util.Properties
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
import cn.ac.iie.config.ApplicationConfig
|
import cn.ac.iie.config.ApplicationConfig
|
||||||
import cn.ac.iie.dao.BaseArangoData
|
import cn.ac.iie.dao.BaseArangoData
|
||||||
import cn.ac.iie.dao.BaseArangoData._
|
|
||||||
import cn.ac.iie.service.transform.MergeDataFrame._
|
|
||||||
import cn.ac.iie.service.update.UpdateDocHandler._
|
import cn.ac.iie.service.update.UpdateDocHandler._
|
||||||
import cn.ac.iie.utils.{ArangoDBConnect, ExecutorThreadPool, SparkSessionUtil}
|
import cn.ac.iie.utils.SparkSessionUtil
|
||||||
|
import cn.ac.iie.service.transform.MergeDataFrame._
|
||||||
import cn.ac.iie.utils.SparkSessionUtil.spark
|
import cn.ac.iie.utils.SparkSessionUtil.spark
|
||||||
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
|
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
|
||||||
import org.apache.spark.TaskContext
|
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.Row
|
import org.apache.spark.sql.{Row, SaveMode}
|
||||||
|
import org.apache.spark.sql.types._
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
import scala.collection.mutable.WrappedArray.ofRef
|
import scala.collection.mutable.WrappedArray.ofRef
|
||||||
|
|
||||||
object UpdateDocument {
|
object UpdateDocument {
|
||||||
private val pool = ExecutorThreadPool.getInstance
|
|
||||||
private val arangoManger: ArangoDBConnect = ArangoDBConnect.getInstance()
|
|
||||||
private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass)
|
private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass)
|
||||||
private val baseArangoData = new BaseArangoData()
|
|
||||||
|
|
||||||
def update(): Unit = {
|
def update(): Unit = {
|
||||||
try {
|
try {
|
||||||
updateDocument("FQDN", historyVertexFqdnMap, getVertexFqdnRow, classOf[BaseDocument], mergeVertexFqdn)
|
updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp)
|
||||||
updateDocument("IP", historyVertexIpMap, getVertexIpRow, classOf[BaseDocument], mergeVertexIp)
|
|
||||||
updateDocument("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp)
|
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception => e.printStackTrace()
|
case e: Exception => e.printStackTrace()
|
||||||
} finally {
|
} finally {
|
||||||
pool.shutdown()
|
|
||||||
arangoManger.clean()
|
|
||||||
SparkSessionUtil.closeSpark()
|
SparkSessionUtil.closeSpark()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def updateDocument[T <: BaseDocument](collName: String,
|
private def updateDocument[T <: BaseDocument](collName: String,
|
||||||
historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]],
|
getDocumentRow: Row => T,
|
||||||
getDocumentRow: (Row, ConcurrentHashMap[String, T]) => T,
|
|
||||||
clazz: Class[T],
|
clazz: Class[T],
|
||||||
getNewDataRdd: () => RDD[Row]
|
getNewDataRdd: () => RDD[Row]
|
||||||
): Unit = {
|
): Unit = {
|
||||||
baseArangoData.readHistoryData(collName, historyMap, clazz)
|
|
||||||
val hisBc = spark.sparkContext.broadcast(historyMap)
|
|
||||||
try {
|
try {
|
||||||
val start = System.currentTimeMillis()
|
val start = System.currentTimeMillis()
|
||||||
val newDataRdd = getNewDataRdd()
|
val newDataRdd = getNewDataRdd()
|
||||||
|
val schema = StructType(List(
|
||||||
|
StructField("fqdn",StringType),
|
||||||
|
StructField("ip",StringType),
|
||||||
|
StructField("first_found_time",LongType),
|
||||||
|
StructField("last_found_time",LongType),
|
||||||
|
StructField("dns_cnt_total",LongType),
|
||||||
|
StructField("tls_cnt_total",LongType),
|
||||||
|
StructField("http_cnt_total",LongType),
|
||||||
|
StructField("dist_cip",ArrayType(StringType)),
|
||||||
|
StructField("stat_time",LongType)
|
||||||
|
))
|
||||||
|
val frame = spark.createDataFrame(newDataRdd,schema)
|
||||||
|
/*
|
||||||
newDataRdd.foreachPartition(iter => {
|
newDataRdd.foreachPartition(iter => {
|
||||||
val partitionId: Int = TaskContext.get.partitionId
|
|
||||||
val dictionaryMap: ConcurrentHashMap[String, T] = hisBc.value.get(partitionId)
|
|
||||||
val resultDocumentList = new util.ArrayList[T]
|
|
||||||
var i = 0
|
var i = 0
|
||||||
iter.foreach(row => {
|
iter.foreach(row => {
|
||||||
val document = getDocumentRow(row, dictionaryMap)
|
val document = getDocumentRow(row)
|
||||||
resultDocumentList.add(document)
|
|
||||||
i += 1
|
i += 1
|
||||||
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
|
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
|
||||||
arangoManger.overwrite(resultDocumentList, collName)
|
|
||||||
LOG.warn(s"更新:$collName" + i)
|
LOG.warn(s"更新:$collName" + i)
|
||||||
i = 0
|
i = 0
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if (i != 0) {
|
if (i != 0) {
|
||||||
arangoManger.overwrite(resultDocumentList, collName)
|
|
||||||
LOG.warn(s"更新$collName:" + i)
|
LOG.warn(s"更新$collName:" + i)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
*/
|
||||||
|
frame.write.mode(SaveMode.Append).format("jdbc").options(
|
||||||
|
Map(
|
||||||
|
"driver" -> "ru.yandex.clickhouse.ClickHouseDriver",
|
||||||
|
"url" -> "jdbc:clickhouse://192.168.40.194:8123/ip_learning",
|
||||||
|
"dbtable" -> "r_locate_fqdn2ip_local",
|
||||||
|
"user" -> "default",
|
||||||
|
"password" -> "111111",
|
||||||
|
"batchsize" -> "10000",
|
||||||
|
"truncate" -> "true")
|
||||||
|
).save()
|
||||||
val last = System.currentTimeMillis()
|
val last = System.currentTimeMillis()
|
||||||
LOG.warn(s"更新$collName 时间:${last - start}")
|
LOG.warn(s"更新$collName 时间:${last - start}")
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception => e.printStackTrace()
|
case e: Exception => e.printStackTrace()
|
||||||
} finally {
|
|
||||||
hisBc.destroy()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -126,7 +131,7 @@ object UpdateDocument {
|
|||||||
document
|
document
|
||||||
}
|
}
|
||||||
|
|
||||||
private def getRelationFqdnLocateIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseEdgeDocument]): BaseEdgeDocument = {
|
private def getRelationFqdnLocateIpRow(row: Row): BaseEdgeDocument = {
|
||||||
val fqdn = row.getAs[String]("FQDN")
|
val fqdn = row.getAs[String]("FQDN")
|
||||||
val serverIp = row.getAs[String]("common_server_ip")
|
val serverIp = row.getAs[String]("common_server_ip")
|
||||||
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
|
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
|
||||||
@@ -139,13 +144,8 @@ object UpdateDocument {
|
|||||||
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
|
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
|
||||||
|
|
||||||
val key = fqdn.concat("-" + serverIp)
|
val key = fqdn.concat("-" + serverIp)
|
||||||
var document = dictionaryMap.getOrDefault(key, null)
|
val document:BaseEdgeDocument = new BaseEdgeDocument()
|
||||||
if (document != null) {
|
|
||||||
updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
|
|
||||||
updateProtocolAttritube(document, sepAttritubeMap)
|
|
||||||
updateDistinctIp(document, distinctIp)
|
|
||||||
} else {
|
|
||||||
document = new BaseEdgeDocument()
|
|
||||||
document.setKey(key)
|
document.setKey(key)
|
||||||
document.setFrom("FQDN/" + fqdn)
|
document.setFrom("FQDN/" + fqdn)
|
||||||
document.setTo("IP/" + serverIp)
|
document.setTo("IP/" + serverIp)
|
||||||
@@ -153,7 +153,7 @@ object UpdateDocument {
|
|||||||
document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
|
document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
|
||||||
putProtocolAttritube(document, sepAttritubeMap)
|
putProtocolAttritube(document, sepAttritubeMap)
|
||||||
putDistinctIp(document, distinctIp)
|
putDistinctIp(document, distinctIp)
|
||||||
}
|
|
||||||
document
|
document
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user