first commit

This commit is contained in:
wanglihui
2020-06-28 18:27:48 +08:00
parent 6f86960a70
commit 9ffe686f3f
34 changed files with 3310 additions and 0 deletions

View File

@@ -0,0 +1,125 @@
package cn.ac.iie.dao
import cn.ac.iie.test.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
object BaseMediaDataLoad {
def loadMediaDate(spark: SparkSession): Unit = {
val mediaDataFrame: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:clickhouse://192.168.40.193:8123")
.option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "default")
.option("password", "111111")
.option("numPartitions", Config.NUMPARTITIONS)
.option("partitionColumn", "recv_time")
.option("lowerBound", Config.MINTIME)
.option("upperBound", Config.MAXTIME)
.option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
.load()
mediaDataFrame.printSchema()
mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
}
def getFQDNVertexFromMedia(spark: SparkSession): DataFrame = {
val v_FQDN_DF = spark.sql(
"""
|SELECT
| media_domain AS new_fqdn_name,
| MIN( recv_time ) AS new_fqdn_first_found_time,
| MAX( recv_time ) AS new_fqdn_last_found_time,
| COUNT( * ) AS new_fqdn_count_total
|FROM
| global_temp.media_expire_patch
|WHERE
| media_domain != ''
|GROUP BY
| media_domain
""".stripMargin
)
v_FQDN_DF
}
def getIPVertexFromMedia(spark: SparkSession): DataFrame = {
val s_IP_DF = spark.sql(
"""
select
s1_s_ip as new_ip,
s1_s_location_region as new_location,
MIN( recv_time ) AS new_ip_first_found_time,
MAX( recv_time ) AS new_ip_last_found_time,
COUNT( * ) AS new_ip_count_total
from global_temp.media_expire_patch
GROUP BY
s1_s_ip,
s1_s_location_region
""".stripMargin)
val d_IP_DF = spark.sql(
"""
select
s1_d_ip as new_ip,
s1_d_location_region as new_location,
| MIN( recv_time ) AS new_ip_first_found_time,
MAX( recv_time ) AS new_ip_last_found_time,
COUNT( * ) AS new_ip_count_total
from global_temp.media_expire_patch
GROUP BY
s1_d_ip,
s1_d_location_region
""".stripMargin)
import org.apache.spark.sql.functions._
val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("new_ip", "new_location").agg(
min("new_ip_first_found_time").as("new_ip_first_found_time"),
max("new_ip_last_found_time").as("new_ip_last_found_time"),
count("new_ip").as("new_ip_count_total")
)
v_IP_DF
}
def getFQDNAddressIPEdgeFromMedia(spark: SparkSession): DataFrame = {
val e_Address_v_FQDN_to_v_IP_DF = spark.sql(
"""
|SELECT
| media_domain AS new_fqdn,
| s1_d_ip AS new_ip,
| MIN( recv_time ) AS new_first_found_time,
| MAX( recv_time ) AS new_last_found_time,
| COUNT( * ) AS new_count_total,
| CONCAT_WS('-',media_domain,s1_d_ip) AS new_key
|FROM
| global_temp.media_expire_patch
|WHERE
| ( media_domain != '' )
| AND ( s1_d_ip != '' )
|GROUP BY
| s1_d_ip,
| media_domain
""".stripMargin)
e_Address_v_FQDN_to_v_IP_DF
}
def getIPVisitFQDNEdgeFromMedia(spark: SparkSession): DataFrame = {
val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
"""
|SELECT
| s1_s_ip AS new_ip,
| media_domain AS new_fqdn,
| MIN( recv_time ) AS new_first_found_time,
| MAX( recv_time ) AS new_last_found_time,
| COUNT( * ) AS new_count_total,
| CONCAT_WS('-',s1_s_ip,media_domain) as new_key
|FROM
| global_temp.media_expire_patch
|WHERE
| ( s1_s_ip != '' )
| AND ( media_domain != '' )
|GROUP BY
| s1_s_ip,
| media_domain
""".stripMargin)
e_Visit_v_IP_to_v_FQDN_DF
}
}

View File

@@ -0,0 +1,177 @@
package cn.ac.iie.dao
import cn.ac.iie.test.ArangoDbTest.arangoDB
import cn.ac.iie.test.Config
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import org.apache.spark.sql.DataFrame
import scala.util.Try
object UpdateArangoGraph {
/**
* 更新FQDN点
*/
def updateFQDNVertex(v_FQDN_DF:DataFrame): Unit ={
v_FQDN_DF.printSchema()
v_FQDN_DF.foreachPartition(iter => {
val v_FQDN_Coll = arangoDB.db("insert_iplearn_index").collection("V_FQDN")
val docs_Insert = new java.util.ArrayList[BaseDocument]()
val docs_Update = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("FQDN_NAME")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
if (v_FQDN_Coll.documentExists(fqdn)) {
val document: BaseDocument = v_FQDN_Coll.getDocument(fqdn, classOf[BaseDocument])
val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
document.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(fqdn)
baseDocument.addAttribute("FQDN_NAME", fqdn)
baseDocument.addAttribute("FQDN_FIRST_FOUND_TIME", v_Fqdn_First)
baseDocument.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(v_FQDN_Coll.importDocuments(docs_Insert))
Try(v_FQDN_Coll.updateDocuments(docs_Update))
})
}
/**
* 更新IP点
*/
def updateIPVertex(v_IP_DF:DataFrame): Unit ={
v_IP_DF.printSchema()
v_IP_DF.foreachPartition(iter => {
val v_IP_Coll = arangoDB.db("insert_iplearn_index").collection("V_IP")
val docs_Insert: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
val docs_Update: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val ip = row.getAs[String]("IP")
val location = row.getAs[String]("location")
val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
if (v_IP_Coll.documentExists(ip)) {
val document: BaseDocument = v_IP_Coll.getDocument(ip, classOf[BaseDocument])
val ip_Cnt = Try(document.getAttribute("IP_APPEAR_COUNT")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", v_IP_Last)
document.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt+ip_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(ip)
baseDocument.addAttribute("IP", ip)
baseDocument.addAttribute("IP_LOCATION", location)
baseDocument.addAttribute("FIRST_FOUND_TIME", v_IP_First)
baseDocument.addAttribute("LAST_FOUND_TIME", v_IP_Last)
baseDocument.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(v_IP_Coll.importDocuments(docs_Insert))
Try(v_IP_Coll.updateDocuments(docs_Update))
})
}
/**
* 统计e_Address_Fqdn_to_IP
*/
def updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF:DataFrame): Unit ={
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Address_v_FQDN_to_v_IP_DF.foreachPartition(iter => {
val e_Add_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
if (e_Add_Fqdn_to_IP_Coll.documentExists(fqdn+"-"+ip)) {
val document: BaseEdgeDocument = e_Add_Fqdn_to_IP_Coll.getDocument(fqdn+"-"+ip, classOf[BaseEdgeDocument])
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(fqdn+"-"+ip)
baseDocument.setFrom(s"V_FQDN/$fqdn")
baseDocument.setTo(s"V_IP/$ip")
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
// println(fqdn+"-"+ip)
i+=1
})
Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Add_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
}
/**
* 统计e_Visit_v_IP_to_v_FQDN
*/
def updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF:DataFrame): Unit ={
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_DF.foreachPartition(iter => {
val e_Visit_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_VISIT_V_IP_TO_V_FQDN")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
if (e_Visit_Fqdn_to_IP_Coll.documentExists(ip+"-"+fqdn)) {
val document: BaseEdgeDocument = e_Visit_Fqdn_to_IP_Coll.getDocument(ip+"-"+fqdn, classOf[BaseEdgeDocument])
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(ip+"-"+fqdn)
baseDocument.setFrom(s"V_IP/$ip")
baseDocument.setTo(s"V_FQDN/$fqdn")
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Visit_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
}
}

View File

@@ -0,0 +1,237 @@
package cn.ac.iie.dao
import cn.ac.iie.etl.CursorTransform
import cn.ac.iie.pojo.{BaseEdgeIPVisitFqdn, BaseEgdeFqdnAddressIP, BaseVertexFqdn, BaseVertexIP}
import cn.ac.iie.test.Config
import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Try
object UpdateArangoGraphByArangoSpark {
/**
* 更新FQDN点
*/
def updateFQDNVertex(v_FQDN_DF:DataFrame,v_FQDN_Cursor_DF: DataFrame): Unit ={
v_FQDN_DF.printSchema()
v_FQDN_Cursor_DF.printSchema()
val v_Fqdn_Join_Df = v_FQDN_DF
.join(v_FQDN_Cursor_DF,v_FQDN_DF("new_fqdn_name")===v_FQDN_Cursor_DF("key"),"fullouter")
v_Fqdn_Join_Df.printSchema()
v_Fqdn_Join_Df.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_FQDN_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_FQDN")
val docs_Replace = new java.util.ArrayList[BaseVertexFqdn]()
val docs_Insert = new java.util.ArrayList[BaseVertexFqdn]()
iter.foreach(row => {
val new_fqdn_name = row.getAs[String]("new_fqdn_name")
val new_fqdn_first_found_time = row.getAs[Long]("new_fqdn_first_found_time")
val new_fqdn_last_found_time = row.getAs[Long]("new_fqdn_last_found_time")
val new_fqdn_count_total = row.getAs[Long]("new_fqdn_count_total")
val fqdn = row.getAs[String]("key")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
if (fqdn != null) {
val document: BaseVertexFqdn = new BaseVertexFqdn()
document.setKey(new_fqdn_name)
document.setFQDN_NAME(new_fqdn_name)
document.setFQDN_FIRST_FOUND_TIME(v_Fqdn_First)
document.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
document.setFQDN_COUNT_TOTAL(v_Fqdn_Cnt+new_fqdn_count_total)
docs_Replace.add(document)
} else {
val baseDocument: BaseVertexFqdn = new BaseVertexFqdn()
baseDocument.setKey(new_fqdn_name)
baseDocument.setFQDN_NAME(new_fqdn_name)
baseDocument.setFQDN_FIRST_FOUND_TIME(new_fqdn_first_found_time)
baseDocument.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
baseDocument.setFQDN_COUNT_TOTAL(new_fqdn_count_total)
docs_Insert.add(baseDocument)
}
})
Try(v_FQDN_Coll.replaceDocuments(docs_Replace))
Try(v_FQDN_Coll.importDocuments(docs_Insert))
})
}
/**
* 更新IP点
*/
def updateIPVertex(v_IP_DF:DataFrame,v_IP_Cursor_DF: DataFrame): Unit ={
v_IP_DF.printSchema()
v_IP_Cursor_DF.printSchema()
val v_IP_Join_DF = v_IP_DF.join(v_IP_Cursor_DF,v_IP_DF("new_ip")===v_IP_Cursor_DF("key"),"fullouter")
v_IP_Join_DF.printSchema()
v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_IP")
val docs_Insert: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
val docs_Replace: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
iter.foreach(row => {
val new_Ip = row.getAs[String]("new_ip")
val new_Location = row.getAs[String]("new_location")
val new_Ip_First_Found_Time = row.getAs[Long]("new_ip_first_found_time")
val new_Ip_Last_Found_Time = row.getAs[Long]("new_ip_last_found_time")
val new_Ip_Count_Total = row.getAs[Long]("new_ip_count_total")
val key = row.getAs[String]("key")
val location = row.getAs[String]("IP_LOCATION")
val v_IP_First = row.getAs[Long]("FIRST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_APPEAR_COUNT")
if (key != null) {
val document = new BaseVertexIP()
document.setKey(key)
document.setIP(key)
document.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
document.setIP_APPEAR_COUNT(v_IP_Cnt+new_Ip_Count_Total)
document.setFIRST_FOUND_TIME(v_IP_First)
document.setIP_LOCATION(location)
docs_Replace.add(document)
} else {
val baseDocument = new BaseVertexIP()
baseDocument.setKey(new_Ip)
baseDocument.setIP(new_Ip)
baseDocument.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
baseDocument.setIP_APPEAR_COUNT(new_Ip_Count_Total)
baseDocument.setFIRST_FOUND_TIME(new_Ip_First_Found_Time)
baseDocument.setIP_LOCATION(new_Location)
docs_Insert.add(baseDocument)
}
})
Try(v_IP_Coll.importDocuments(docs_Insert))
Try(v_IP_Coll.updateDocuments(docs_Replace))
})
}
/**
* 统计e_Address_Fqdn_to_IP
*/
def updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF:DataFrame,e_Fqdn_Address_IP_Cursor_DF: DataFrame): Unit ={
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Fqdn_Address_IP_Cursor_DF.printSchema()
e_Fqdn_Address_IP_Cursor_DF.printSchema()
val e_Address_v_FQDN_to_v_IP_Join_DF = e_Address_v_FQDN_to_v_IP_DF
.join(e_Fqdn_Address_IP_Cursor_DF,
e_Address_v_FQDN_to_v_IP_DF("new_key")===e_Fqdn_Address_IP_Cursor_DF("key"),
"fullouter")
e_Address_v_FQDN_to_v_IP_Join_DF.printSchema()
e_Address_v_FQDN_to_v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Add_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs_Insert: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
val docs_Replace: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
iter.foreach(row => {
val new_Fqdn = row.getAs[String]("new_fqdn")
val new_IP = row.getAs[String]("new_ip")
val new_Key = row.getAs[String]("new_key")
val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
val new_Count_Total = row.getAs[Long]("new_count_total")
val from = row.getAs[String]("from")
val to = row.getAs[String]("to")
val key = row.getAs[String]("key")
val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
if (key != null) {
val document = new BaseEgdeFqdnAddressIP()
document.setKey(key)
document.setFrom(from)
document.setTo(to)
document.setLAST_FOUND_TIME(new_Last_Found_Time)
document.setFIRST_FOUND_TIME(e_First_Time)
document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
docs_Replace.add(document)
} else {
val baseDocument: BaseEgdeFqdnAddressIP = new BaseEgdeFqdnAddressIP()
baseDocument.setKey(new_Key)
baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
baseDocument.setTo(s"V_IP/$new_IP")
baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
baseDocument.setCOUNT_TOTAL(new_Count_Total)
docs_Insert.add(baseDocument)
}
})
Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Add_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
})
}
/**
* 统计e_Visit_v_IP_to_v_FQDN
*/
def updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF:DataFrame,e_IP_Visit_FQDN_Cursor_DF: DataFrame): Unit = {
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
e_IP_Visit_FQDN_Cursor_DF.printSchema()
e_IP_Visit_FQDN_Cursor_DF.printSchema()
val e_Visit_v_IP_to_v_FQDN_Join_DF = e_Visit_v_IP_to_v_FQDN_DF
.join(e_IP_Visit_FQDN_Cursor_DF, e_Visit_v_IP_to_v_FQDN_DF("new_key") === e_IP_Visit_FQDN_Cursor_DF("key"), "fullouter")
e_Visit_v_IP_to_v_FQDN_Join_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Visit_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_VISIT_V_IP_TO_V_FQDN")
val docs_Insert: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
val docs_Replace: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
iter.foreach(row => {
val new_Fqdn = row.getAs[String]("new_fqdn")
val new_IP = row.getAs[String]("new_ip")
val new_Key = row.getAs[String]("new_key")
val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
val new_Count_Total = row.getAs[Long]("new_count_total")
val to = row.getAs[String]("to")
val from = row.getAs[String]("from")
val key = row.getAs[String]("key")
val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
if (key != null) {
val document = new BaseEdgeIPVisitFqdn()
document.setKey(key)
document.setFrom(from)
document.setTo(to)
document.setLAST_FOUND_TIME(new_Last_Found_Time)
document.setFIRST_FOUND_TIME(e_First_Time)
document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
docs_Replace.add(document)
} else {
val baseDocument: BaseEdgeIPVisitFqdn = new BaseEdgeIPVisitFqdn()
baseDocument.setKey(new_Key)
baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
baseDocument.setTo(s"V_IP/$new_IP")
baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
baseDocument.setCOUNT_TOTAL(new_Count_Total)
docs_Insert.add(baseDocument)
}
})
Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Visit_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
})
}
}

View File

@@ -0,0 +1,250 @@
package cn.ac.iie.dao
import cn.ac.iie.etl.CursorTransform
import cn.ac.iie.pojo.{BaseEdgeIPVisitFqdn, BaseEgdeFqdnAddressIP, BaseVertexFqdn, BaseVertexIP}
import cn.ac.iie.test.Config
import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Try
object UpdateArangoGraphByDF {
/**
* 更新FQDN点
* @param v_FQDN_DF //读取clickhouse结果集
* @param spark //sparkSession引擎
*/
def updateFQDNVertex(v_FQDN_DF:DataFrame,spark:SparkSession): Unit ={
v_FQDN_DF.printSchema()
val v_FQDN_Cursor_DF = CursorTransform.cursorToDataFrame("V_FQDN",classOf[BaseVertexFqdn],spark)
val v_Fqdn_Join_Df = v_FQDN_DF
.join(v_FQDN_Cursor_DF,v_FQDN_DF("new_fqdn_name")===v_FQDN_Cursor_DF("key"),"fullouter")
v_Fqdn_Join_Df.printSchema()
v_Fqdn_Join_Df.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_FQDN_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_FQDN")
val docs_Replace = new java.util.ArrayList[BaseVertexFqdn]()
val docs_Insert = new java.util.ArrayList[BaseVertexFqdn]()
iter.foreach(row => {
val new_fqdn_name = row.getAs[String]("new_fqdn_name")
val new_fqdn_first_found_time = row.getAs[Long]("new_fqdn_first_found_time")
val new_fqdn_last_found_time = row.getAs[Long]("new_fqdn_last_found_time")
val new_fqdn_count_total = row.getAs[Long]("new_fqdn_count_total")
val fqdn = row.getAs[String]("key")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
if (fqdn != null) {
val document: BaseVertexFqdn = new BaseVertexFqdn()
document.setKey(new_fqdn_name)
document.setFQDN_NAME(new_fqdn_name)
document.setFQDN_FIRST_FOUND_TIME(v_Fqdn_First)
document.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
document.setFQDN_COUNT_TOTAL(v_Fqdn_Cnt+new_fqdn_count_total)
docs_Replace.add(document)
} else {
val baseDocument: BaseVertexFqdn = new BaseVertexFqdn()
baseDocument.setKey(new_fqdn_name)
baseDocument.setFQDN_NAME(new_fqdn_name)
baseDocument.setFQDN_FIRST_FOUND_TIME(new_fqdn_first_found_time)
baseDocument.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
baseDocument.setFQDN_COUNT_TOTAL(new_fqdn_count_total)
docs_Insert.add(baseDocument)
}
})
Try(v_FQDN_Coll.replaceDocuments(docs_Replace))
Try(v_FQDN_Coll.importDocuments(docs_Insert))
})
}
/**
* 更新IP点
* @param v_IP_DF //读取clickhouse结果集
* @param spark //sparkSession引擎
*/
def updateIPVertex(v_IP_DF:DataFrame,spark:SparkSession): Unit ={
v_IP_DF.printSchema()
val v_IP_Cursor_DF = CursorTransform.cursorToDataFrame("V_IP",classOf[BaseVertexIP],spark)
val v_IP_Join_DF = v_IP_DF.join(v_IP_Cursor_DF,v_IP_DF("new_ip")===v_IP_Cursor_DF("key"),"fullouter")
v_IP_Join_DF.printSchema()
v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_IP")
val docs_Insert: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
val docs_Replace: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
iter.foreach(row => {
val new_Ip = row.getAs[String]("new_ip")
val new_Location = row.getAs[String]("new_location")
val new_Ip_First_Found_Time = row.getAs[Long]("new_ip_first_found_time")
val new_Ip_Last_Found_Time = row.getAs[Long]("new_ip_last_found_time")
val new_Ip_Count_Total = row.getAs[Long]("new_ip_count_total")
val key = row.getAs[String]("key")
val location = row.getAs[String]("IP_LOCATION")
val v_IP_First = row.getAs[Long]("FIRST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_APPEAR_COUNT")
if (key != null) {
val document = new BaseVertexIP()
document.setKey(key)
document.setIP(key)
document.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
document.setIP_APPEAR_COUNT(v_IP_Cnt+new_Ip_Count_Total)
document.setFIRST_FOUND_TIME(v_IP_First)
document.setIP_LOCATION(location)
docs_Replace.add(document)
} else {
val baseDocument = new BaseVertexIP()
baseDocument.setKey(new_Ip)
baseDocument.setIP(new_Ip)
baseDocument.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
baseDocument.setIP_APPEAR_COUNT(new_Ip_Count_Total)
baseDocument.setFIRST_FOUND_TIME(new_Ip_First_Found_Time)
baseDocument.setIP_LOCATION(new_Location)
docs_Insert.add(baseDocument)
}
})
Try(v_IP_Coll.importDocuments(docs_Insert))
Try(v_IP_Coll.updateDocuments(docs_Replace))
})
}
/**
* 统计e_Address_Fqdn_to_IP
* @param e_Address_v_FQDN_to_v_IP_DF //读取clickhouse结果集
* @param spark //sparkSession引擎
*/
def updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF:DataFrame,spark:SparkSession): Unit ={
e_Address_v_FQDN_to_v_IP_DF.printSchema()
val e_Fqdn_Address_IP_Cursor_DF = CursorTransform
.cursorToDataFrame("E_ADDRESS_V_FQDN_TO_V_IP",classOf[BaseEgdeFqdnAddressIP],spark)
e_Fqdn_Address_IP_Cursor_DF.printSchema()
val e_Address_v_FQDN_to_v_IP_Join_DF = e_Address_v_FQDN_to_v_IP_DF
.join(e_Fqdn_Address_IP_Cursor_DF,
e_Address_v_FQDN_to_v_IP_DF("new_key")===e_Fqdn_Address_IP_Cursor_DF("key"),
"fullouter")
e_Address_v_FQDN_to_v_IP_Join_DF.printSchema()
e_Address_v_FQDN_to_v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Add_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs_Insert: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
val docs_Replace: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
iter.foreach(row => {
val new_Fqdn = row.getAs[String]("new_fqdn")
val new_IP = row.getAs[String]("new_ip")
val new_Key = row.getAs[String]("new_key")
val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
val new_Count_Total = row.getAs[Long]("new_count_total")
val from = row.getAs[String]("from")
val to = row.getAs[String]("to")
val key = row.getAs[String]("key")
val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
if (key != null) {
val document = new BaseEgdeFqdnAddressIP()
document.setKey(key)
document.setFrom(from)
document.setTo(to)
document.setLAST_FOUND_TIME(new_Last_Found_Time)
document.setFIRST_FOUND_TIME(e_First_Time)
document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
docs_Replace.add(document)
} else {
val baseDocument: BaseEgdeFqdnAddressIP = new BaseEgdeFqdnAddressIP()
baseDocument.setKey(new_Key)
baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
baseDocument.setTo(s"V_IP/$new_IP")
baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
baseDocument.setCOUNT_TOTAL(new_Count_Total)
docs_Insert.add(baseDocument)
}
})
Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Add_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
})
}
/**
* 统计e_Visit_v_IP_to_v_FQDN
* @param e_Visit_v_IP_to_v_FQDN_DF //读取clickhouse结果集
* @param spark //sparkSession引擎
*/
def updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF:DataFrame,spark:SparkSession): Unit = {
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
val e_IP_Visit_FQDN_Cursor_DF = CursorTransform
.cursorToDataFrame("E_VISIT_V_IP_TO_V_FQDN",classOf[BaseEdgeIPVisitFqdn],spark)
e_IP_Visit_FQDN_Cursor_DF.printSchema()
val e_Visit_v_IP_to_v_FQDN_Join_DF = e_Visit_v_IP_to_v_FQDN_DF
.join(e_IP_Visit_FQDN_Cursor_DF, e_Visit_v_IP_to_v_FQDN_DF("new_key") === e_IP_Visit_FQDN_Cursor_DF("key"), "fullouter")
e_Visit_v_IP_to_v_FQDN_Join_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Visit_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_VISIT_V_IP_TO_V_FQDN")
val docs_Insert: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
val docs_Replace: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
iter.foreach(row => {
val new_Fqdn = row.getAs[String]("new_fqdn")
val new_IP = row.getAs[String]("new_ip")
val new_Key = row.getAs[String]("new_key")
val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
val new_Count_Total = row.getAs[Long]("new_count_total")
val to = row.getAs[String]("to")
val from = row.getAs[String]("from")
val key = row.getAs[String]("key")
val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
if (key != null) {
val document = new BaseEdgeIPVisitFqdn()
document.setKey(key)
document.setFrom(from)
document.setTo(to)
document.setLAST_FOUND_TIME(new_Last_Found_Time)
document.setFIRST_FOUND_TIME(e_First_Time)
document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
docs_Replace.add(document)
} else {
val baseDocument: BaseEdgeIPVisitFqdn = new BaseEdgeIPVisitFqdn()
baseDocument.setKey(new_Key)
baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
baseDocument.setTo(s"V_IP/$new_IP")
baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
baseDocument.setCOUNT_TOTAL(new_Count_Total)
docs_Insert.add(baseDocument)
}
})
Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Visit_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
})
}
}

View File

@@ -0,0 +1,33 @@
package cn.ac.iie.etl
import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
import com.arangodb.ArangoCursor
import com.arangodb.entity.BaseDocument
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.reflect.ClassTag
object CursorTransform {
/**
* 将查询ArangoDB的结果集转换为DataFrame
* @param collection_Name //查询的collection
* @param class_Type //转换的pojo类对象
* @param spark / /sparkSession引擎
* @tparam T
* @return
*/
def cursorToDataFrame[T:ClassTag](collection_Name:String,class_Type: Class[T],spark:SparkSession): DataFrame ={
val query = s"FOR doc IN $collection_Name RETURN doc"
println(query)
val cursor: ArangoCursor[T] = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
.query(query, InitArangoDBPool.bindVars, InitArangoDBPool.options, class_Type)
val cursor_DF = spark.createDataFrame(cursor.asListRemaining(),class_Type)
cursor_DF.printSchema()
cursor_DF
}
}

View File

@@ -0,0 +1,29 @@
package cn.ac.iie.main
import cn.ac.iie.test.Config
import cn.ac.iie.dao.BaseMediaDataLoad
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
object IPLearningApplication {
private val logger: Logger = LoggerFactory.getLogger(IPLearningApplication.getClass)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// .config("spark.kryo.classesToRegister","com.tinkerpop.blueprints.impls.orient.OrientGraphFactory")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
.getOrCreate()
logger.warn("sparkession获取成功")
BaseMediaDataLoad.loadMediaDate(spark)
// BaseMediaDataLoad.
}
}

View File

@@ -0,0 +1,34 @@
package cn.ac.iie.pojo
import com.arangodb.entity.DocumentField
import com.arangodb.entity.DocumentField.Type
import scala.beans.BeanProperty
class BaseEdgeIPVisitFqdn {
@BeanProperty
@DocumentField(Type.FROM)
var from: String=""
@BeanProperty
@DocumentField(Type.TO)
var to: String=""
@BeanProperty
@DocumentField(Type.KEY)
var key: String=""
@BeanProperty
@DocumentField(Type.ID)
var id: String=""
@BeanProperty
var FIRST_FOUND_TIME:Long = 0
@BeanProperty
var LAST_FOUND_TIME:Long = 0
@BeanProperty
var COUNT_TOTAL:Long = 0
}

View File

@@ -0,0 +1,34 @@
package cn.ac.iie.pojo
import com.arangodb.entity.DocumentField
import com.arangodb.entity.DocumentField.Type
import scala.beans.BeanProperty
class BaseEgdeFqdnAddressIP {
@BeanProperty
@DocumentField(Type.FROM)
var from: String=""
@BeanProperty
@DocumentField(Type.TO)
var to: String=""
@BeanProperty
@DocumentField(Type.KEY)
var key: String=""
@BeanProperty
@DocumentField(Type.ID)
var id: String=""
@BeanProperty
var FIRST_FOUND_TIME:Long = 0
@BeanProperty
var LAST_FOUND_TIME:Long = 0
@BeanProperty
var COUNT_TOTAL:Long = 0
}

View File

@@ -0,0 +1,30 @@
package cn.ac.iie.pojo
import com.arangodb.entity.DocumentField
import com.arangodb.entity.DocumentField.Type
import scala.beans.BeanProperty
class BaseVertexFqdn {
@BeanProperty
@DocumentField(Type.KEY)
var key: String=""
@BeanProperty
@DocumentField(Type.ID)
var id: String=""
@BeanProperty
var FQDN_FIRST_FOUND_TIME:Long = 0
@BeanProperty
var FQDN_LAST_FOUND_TIME:Long = 0
@BeanProperty
var FQDN_COUNT_TOTAL:Long = 0
@BeanProperty
var FQDN_NAME:String = ""
}

View File

@@ -0,0 +1,32 @@
package cn.ac.iie.pojo
import com.arangodb.entity.DocumentField
import com.arangodb.entity.DocumentField.Type
import scala.beans.BeanProperty
class BaseVertexIP {
@BeanProperty
@DocumentField(Type.KEY)
var key: String=""
@BeanProperty
@DocumentField(Type.ID)
var id: String=""
@BeanProperty
var FIRST_FOUND_TIME:Long = 0
@BeanProperty
var LAST_FOUND_TIME:Long = 0
@BeanProperty
var IP_APPEAR_COUNT:Long = 0
@BeanProperty
var IP:String = ""
@BeanProperty
var IP_LOCATION:String = ""
}

View File

@@ -0,0 +1,52 @@
package cn.ac.iie.test
import cn.ac.iie.dao.{BaseMediaDataLoad, UpdateArangoGraphByArangoSpark}
import cn.ac.iie.pojo.{BaseEdgeIPVisitFqdn, BaseEgdeFqdnAddressIP, BaseVertexFqdn, BaseVertexIP}
import cn.ac.iie.utils.ConfigUtils
import com.arangodb.spark.rdd.ArangoRDD
import com.arangodb.spark.{ArangoSpark, ReadOptions}
import org.apache.spark.sql.{DataFrame, SparkSession}
object ArangoDBSparkTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName(ConfigUtils.SPARK_APP_NAME)
.config("spark.serializer", ConfigUtils.SPARK_SERIALIZER)
.config("spark.network.timeout", ConfigUtils.SPARK_NETWORK_TIMEOUT)
.config("spark.sql.shuffle.partitions", ConfigUtils.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", ConfigUtils.SPARK_EXECUTOR_MEMORY)
.config("arangodb.hosts", "192.168.40.127:8529")
.config("arangodb.user", ConfigUtils.ARANGODB_USER)
.config("arangodb.password", ConfigUtils.ARANGODB_PASSWORD)
.config("arangodb.maxConnections",ConfigUtils.MAXPOOLSIZE)
.master(ConfigUtils.MASTER)
.getOrCreate()
BaseMediaDataLoad.loadMediaDate(spark)
val v_FQDN_DF = BaseMediaDataLoad.getFQDNVertexFromMedia(spark)
val v_IP_DF = BaseMediaDataLoad.getIPVertexFromMedia(spark)
val e_Address_v_FQDN_to_v_IP_DF = BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark)
val e_Visit_v_IP_to_v_FQDN_DF= BaseMediaDataLoad.getIPVisitFQDNEdgeFromMedia(spark)
val v_FQDN_Cursor_Rdd: ArangoRDD[BaseVertexFqdn] = ArangoSpark.load[BaseVertexFqdn](spark.sparkContext, "V_FQDN", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
val v_FQDN_Cursor_DF: DataFrame = spark.createDataFrame(v_FQDN_Cursor_Rdd,classOf[BaseVertexFqdn])
val v_IP_Cursor_Rdd: ArangoRDD[BaseVertexIP] = ArangoSpark.load[BaseVertexIP](spark.sparkContext, "V_IP", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
val v_IP_Cursor_DF: DataFrame = spark.createDataFrame(v_IP_Cursor_Rdd,classOf[BaseVertexIP])
val e_Fqdn_Address_IP_Cursor_Rdd: ArangoRDD[BaseEgdeFqdnAddressIP] = ArangoSpark.load[BaseEgdeFqdnAddressIP](spark.sparkContext, "E_ADDRESS_V_FQDN_TO_V_IP", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
val e_Fqdn_Address_IP_Cursor_DF: DataFrame = spark.createDataFrame(e_Fqdn_Address_IP_Cursor_Rdd,classOf[BaseEgdeFqdnAddressIP])
val e_IP_Visit_FQDN_Cursor_Rdd: ArangoRDD[BaseEdgeIPVisitFqdn] = ArangoSpark.load[BaseEdgeIPVisitFqdn](spark.sparkContext, "E_VISIT_V_IP_TO_V_FQDN", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
val e_IP_Visit_FQDN_Cursor_DF: DataFrame = spark.createDataFrame(e_IP_Visit_FQDN_Cursor_Rdd,classOf[BaseEdgeIPVisitFqdn])
UpdateArangoGraphByArangoSpark.updateFQDNVertex(v_FQDN_DF,v_FQDN_Cursor_DF)
UpdateArangoGraphByArangoSpark.updateIPVertex(v_IP_DF,v_IP_Cursor_DF)
UpdateArangoGraphByArangoSpark.updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF,e_Fqdn_Address_IP_Cursor_DF)
UpdateArangoGraphByArangoSpark.updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF,e_IP_Visit_FQDN_Cursor_DF)
spark.close()
System.exit(1)
}
}

View File

@@ -0,0 +1,37 @@
package cn.ac.iie.test
import com.arangodb.entity.BaseDocument
import com.arangodb.model.AqlQueryOptions
import com.arangodb.util.MapBuilder
import com.arangodb.{ArangoCursor, ArangoDB}
object ArangoDbReadV_IPTest {
@transient
var arangoDB: ArangoDB = _
def main(args: Array[String]): Unit = {
arangoDB = new ArangoDB.Builder()
.maxConnections(Config.MAXPOOLSIZE)
.host("192.168.40.127", 8529)
.user("root")
.password("111111")
.build
val bindVars = new MapBuilder().get
val options = new AqlQueryOptions()
.ttl(Config.ARANGODB_TTL)
val v_IP_Mutabal_Map = new java.util.HashMap[String,BaseDocument](16048576,0.9f)
val v_IP_Query = "FOR doc IN V_IP RETURN doc"
val v_IP_Cursor: ArangoCursor[BaseDocument] = arangoDB.db("insert_iplearn_index")
.query(v_IP_Query, bindVars, options, classOf[BaseDocument])
while (v_IP_Cursor.hasNext){
val document = v_IP_Cursor.next()
v_IP_Mutabal_Map.put(document.getKey ,document)
}
println(v_IP_Mutabal_Map.size())
arangoDB.shutdown()
}
}

View File

@@ -0,0 +1,314 @@
package cn.ac.iie.test
import com.arangodb.ArangoDB
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.util.Try
object ArangoDbTest {
private val logger: Logger = LoggerFactory.getLogger(ReadClickhouseTest.getClass)
@transient
var arangoDB: ArangoDB = _
def main(args: Array[String]): Unit = {
// val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", Config.SPARK_SERIALIZER)
// .config("spark.kryo.classesToRegister","com.tinkerpop.blueprints.impls.orient.OrientGraphFactory")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
/*
.config("spark.driver.host", "192.168.41.79")
.config("spark.jars", "D:\\GITREPO\\ip-learning\\target\\ip-learning-1.0-SNAPSHOT-jar-with-dependencies.jar")
.master("spark://192.168.40.119:7077")
*/
.getOrCreate()
logger.warn("sparkession获取成功")
// val sql = "(select * from av_miner.media_expire_patch_local limit 1000)"
val mediaDataFrame: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:clickhouse://192.168.40.193:8123")
// .option("dbtable", "av_miner.media_expire_patch")
// .option("dbtable", "(select * from av_miner.media_expire_patch limit 10)")
// .option("dbtable","(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where media_domain not LIKE '%\\n%')")
.option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
// .option("dbtable","(select media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region,min(recv_time) as min_recv_time,max(recv_time) as max_recv_time from av_miner.media_expire_patch group by media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region limit 10)")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "default")
.option("password", "111111")
.option("numPartitions", Config.NUMPARTITIONS)
.option("partitionColumn", "recv_time")
.option("lowerBound", Config.MINTIME)
.option("upperBound", Config.MAXTIME)
.option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
.load()
// mediaDataFrame.printSchema()
mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
// val mediaDataGlobalView = spark.sql("select * from global_temp.media_expire_patch limit 10")
// mediaDataGlobalView.show()
val v_FQDN_DF = spark.sql(
"""
|SELECT
| media_domain AS FQDN_NAME,
| MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
| MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
| COUNT( * ) AS FQDN_COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| media_domain != ''
|GROUP BY
| media_domain
""".stripMargin
)
val s_IP_DF = spark.sql(
"""
select
s1_s_ip as IP,
s1_s_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
val d_IP_DF = spark.sql(
"""
select
s1_d_ip as IP,
s1_d_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
import org.apache.spark.sql.functions._
val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("IP", "location").agg(
min("IP_FIRST_FOUND_TIME").as("IP_FIRST_FOUND_TIME"),
max("IP_LAST_FOUND_TIME").as("IP_LAST_FOUND_TIME"),
count("IP").as("IP_COUNT_TOTAL")
)
val e_Address_v_FQDN_to_v_IP_DF = spark.sql(
"""
|SELECT
| media_domain AS V_FQDN,
| s1_d_ip AS V_IP,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( media_domain != '' )
| AND ( s1_d_ip != '' )
|GROUP BY
| s1_d_ip,
| media_domain
""".stripMargin)
val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
"""
|SELECT
| s1_s_ip AS V_IP,
| media_domain AS V_FQDN,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( s1_s_ip != '' )
| AND ( media_domain != '' )
|GROUP BY
| s1_s_ip,
| media_domain
""".stripMargin)
/**
* 获取数据库连接
*/
arangoDB = new ArangoDB.Builder()
.maxConnections(Config.MAXPOOLSIZE)
.host("192.168.40.127", 8529)
.user("root")
.password("111111")
.build
/**
* 更新FQDN点
*/
v_FQDN_DF.printSchema()
v_FQDN_DF.foreachPartition(iter => {
val v_FQDN_Coll = arangoDB.db("insert_iplearn_index").collection("V_FQDN")
val docs_Insert = new java.util.ArrayList[BaseDocument]()
val docs_Update = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("FQDN_NAME")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
if (v_FQDN_Coll.documentExists(fqdn)) {
val document: BaseDocument = v_FQDN_Coll.getDocument(fqdn, classOf[BaseDocument])
val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
document.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(fqdn)
baseDocument.addAttribute("FQDN_NAME", fqdn)
baseDocument.addAttribute("FQDN_FIRST_FOUND_TIME", v_Fqdn_First)
baseDocument.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(v_FQDN_Coll.importDocuments(docs_Insert))
Try(v_FQDN_Coll.updateDocuments(docs_Update))
})
/**
* 更新IP点
*/
v_IP_DF.printSchema()
v_IP_DF.foreachPartition(iter => {
val v_IP_Coll = arangoDB.db("insert_iplearn_index").collection("V_IP")
val docs_Insert: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
val docs_Update: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val ip = row.getAs[String]("IP")
val location = row.getAs[String]("location")
val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
if (v_IP_Coll.documentExists(ip)) {
val document: BaseDocument = v_IP_Coll.getDocument(ip, classOf[BaseDocument])
val ip_Cnt = Try(document.getAttribute("IP_APPEAR_COUNT")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", v_IP_Last)
document.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt+ip_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(ip)
baseDocument.addAttribute("IP", ip)
baseDocument.addAttribute("IP_LOCATION", location)
baseDocument.addAttribute("FIRST_FOUND_TIME", v_IP_First)
baseDocument.addAttribute("LAST_FOUND_TIME", v_IP_Last)
baseDocument.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(v_IP_Coll.importDocuments(docs_Insert))
Try(v_IP_Coll.updateDocuments(docs_Update))
})
/**
* 统计e_Address_Fqdn_to_IP
*/
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Address_v_FQDN_to_v_IP_DF.foreachPartition(iter => {
val e_Add_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
if (e_Add_Fqdn_to_IP_Coll.documentExists(fqdn+"-"+ip)) {
val document: BaseEdgeDocument = e_Add_Fqdn_to_IP_Coll.getDocument(fqdn+"-"+ip, classOf[BaseEdgeDocument])
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(fqdn+"-"+ip)
baseDocument.setFrom(s"V_FQDN/$fqdn")
baseDocument.setTo(s"V_IP/$ip")
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
// println(fqdn+"-"+ip)
})
Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Add_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
/**
* 统计e_Visit_v_IP_to_v_FQDN
*/
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_DF.foreachPartition(iter => {
val e_Visit_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_VISIT_V_IP_TO_V_FQDN")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
if (e_Visit_Fqdn_to_IP_Coll.documentExists(ip+"-"+fqdn)) {
val document: BaseEdgeDocument = e_Visit_Fqdn_to_IP_Coll.getDocument(ip+"-"+fqdn, classOf[BaseEdgeDocument])
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(ip+"-"+fqdn)
baseDocument.setFrom(s"V_IP/$ip")
baseDocument.setTo(s"V_FQDN/$fqdn")
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Visit_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
arangoDB.shutdown()
}
}

View File

@@ -0,0 +1,355 @@
package cn.ac.iie.test
import cn.ac.iie.utils.ConfigUtils
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import com.arangodb.model.AqlQueryOptions
import com.arangodb.util.MapBuilder
import com.arangodb.{ArangoCursor, ArangoDB}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.util.Try
object ArangoDbTestMemory {
private val logger: Logger = LoggerFactory.getLogger(ArangoDbTestMemory.getClass)
@transient
var arangoDB: ArangoDB = _
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", Config.SPARK_SERIALIZER)
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
.getOrCreate()
logger.warn("sparkession获取成功")
val mediaDataFrame: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:clickhouse://192.168.40.193:8123")
.option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "default")
.option("password", "111111")
.option("numPartitions", Config.NUMPARTITIONS)
.option("partitionColumn", "recv_time")
.option("lowerBound", Config.MINTIME)
.option("upperBound", Config.MAXTIME)
.option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
.option("socket_timeout",Config.CLICKHOUSE_SOCKET_TIMEOUT)
.load()
mediaDataFrame.printSchema()
mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
val v_FQDN_DF = spark.sql(
"""
|SELECT
| media_domain AS FQDN_NAME,
| MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
| MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
| COUNT( * ) AS FQDN_COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| media_domain != ''
|GROUP BY
| media_domain
""".stripMargin
)
val s_IP_DF = spark.sql(
"""
select
s1_s_ip as IP,
s1_s_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
val d_IP_DF = spark.sql(
"""
select
s1_d_ip as IP,
s1_d_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
import org.apache.spark.sql.functions._
val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("IP", "location").agg(
min("IP_FIRST_FOUND_TIME").as("IP_FIRST_FOUND_TIME"),
max("IP_LAST_FOUND_TIME").as("IP_LAST_FOUND_TIME"),
count("IP").as("IP_COUNT_TOTAL")
)
val e_Address_v_FQDN_to_v_IP_DF: DataFrame = spark.sql(
"""
|SELECT
| media_domain AS V_FQDN,
| s1_d_ip AS V_IP,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( media_domain != '' )
| AND ( s1_d_ip != '' )
|GROUP BY
| s1_d_ip,
| media_domain
""".stripMargin)
val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
"""
|SELECT
| s1_s_ip AS V_IP,
| media_domain AS V_FQDN,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( s1_s_ip != '' )
| AND ( media_domain != '' )
|GROUP BY
| s1_s_ip,
| media_domain
""".stripMargin)
/**
* 获取数据库连接
*/
arangoDB = new ArangoDB.Builder()
.maxConnections(Config.MAXPOOLSIZE)
.host(ConfigUtils.ARANGODB_HOST, ConfigUtils.ARANGODB_PORT)
.user(ConfigUtils.ARANGODB_USER)
.password(ConfigUtils.ARANGODB_PASSWORD)
.build
val bindVars = new MapBuilder().get
val options = new AqlQueryOptions()
.ttl(Config.ARANGODB_TTL)
val v_FQDN_Mutabal_Map = new java.util.HashMap[String,BaseDocument](1048576,0.9f)
val v_IP_Mutabal_Map = new java.util.HashMap[String,BaseDocument](16048576,0.9f)
val e_FQDN_Address_IP_Mutabal_Map = new java.util.HashMap[String,BaseEdgeDocument](1048576,0.9f)
val e_IP_Visit_FQDN_Mutabal_Map = new java.util.HashMap[String,BaseEdgeDocument](30408576,0.9f)
/**
* 更新FQDN点
*/
val v_FQDN_Query = "FOR doc IN V_FQDN RETURN doc"
val v_FQDN_Cursor: ArangoCursor[BaseDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
.query(v_FQDN_Query, bindVars, options, classOf[BaseDocument])
while (v_FQDN_Cursor.hasNext){
val document = v_FQDN_Cursor.next()
v_FQDN_Mutabal_Map.put(document.getKey,document)
}
val v_FQDN_Map= spark.sparkContext.broadcast(v_FQDN_Mutabal_Map)
v_FQDN_Mutabal_Map.clear()
v_FQDN_DF.show(20)
v_FQDN_DF.printSchema()
v_FQDN_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_FQDN_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_FQDN")
val docs_Insert = new java.util.ArrayList[BaseDocument]()
val docs_Update = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("FQDN_NAME")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
val doc = v_FQDN_Map.value.getOrDefault(fqdn, null)
if (doc != null) {
val document: BaseDocument = doc
val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
document.addAttribute("LAST_FOUND_TIME", v_Fqdn_Last)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(fqdn)
baseDocument.addAttribute("FQDN_NAME", fqdn)
baseDocument.addAttribute("FIRST_FOUND_TIME", v_Fqdn_First)
baseDocument.addAttribute("LAST_FOUND_TIME", v_Fqdn_Last)
baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
docs_Insert.add(baseDocument)
}
})
// Try(v_FQDN_Coll.importDocuments(docs_Insert))
v_FQDN_Coll.importDocuments(docs_Insert)
Try(v_FQDN_Coll.updateDocuments(docs_Update))
})
v_FQDN_Map.destroy()
/**
* 更新IP点
*/
val v_IP_Query = "FOR doc IN V_IP RETURN doc"
val v_IP_Cursor: ArangoCursor[BaseDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
.query(v_IP_Query, bindVars, options, classOf[BaseDocument])
while (v_IP_Cursor.hasNext){
val document = v_IP_Cursor.next()
v_IP_Mutabal_Map.put(document.getKey ,document)
}
val v_IP_Map = spark.sparkContext.broadcast(v_IP_Mutabal_Map)
// val v_IP_Map = v_IP_Mutabal_Map.toMap
v_IP_Mutabal_Map.clear()
v_IP_DF.printSchema()
v_IP_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_IP_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_IP")
val docs_Insert: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
val docs_Update: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val ip = row.getAs[String]("IP")
val location = row.getAs[String]("location")
val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
val doc = v_IP_Map.value.getOrDefault(ip, null)
if (doc != null) {
val document: BaseDocument = doc
val ip_Cnt = Try(document.getAttribute("IP_APPEAR_COUNT")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", v_IP_Last)
document.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt+ip_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(ip)
baseDocument.addAttribute("IP", ip)
baseDocument.addAttribute("IP_LOCATION", location)
baseDocument.addAttribute("FIRST_FOUND_TIME", v_IP_First)
baseDocument.addAttribute("LAST_FOUND_TIME", v_IP_Last)
baseDocument.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt)
docs_Insert.add(baseDocument)
}
})
Try(v_IP_Coll.importDocuments(docs_Insert))
Try(v_IP_Coll.updateDocuments(docs_Update))
})
v_IP_Map.destroy()
/**
* 统计e_Address_Fqdn_to_IP
*/
val e_FQDN_Address_IP_Query = "FOR doc IN E_ADDRESS_V_FQDN_TO_V_IP RETURN doc"
val e_FQDN_Address_IP_Cursor: ArangoCursor[BaseEdgeDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
.query(e_FQDN_Address_IP_Query, bindVars, options, classOf[BaseEdgeDocument])
while (e_FQDN_Address_IP_Cursor.hasNext){
val document = e_FQDN_Address_IP_Cursor.next()
e_FQDN_Address_IP_Mutabal_Map.put(document.getKey ,document)
}
val e_FQDN_Address_IP_Map = spark.sparkContext.broadcast(e_FQDN_Address_IP_Mutabal_Map)
e_FQDN_Address_IP_Mutabal_Map.clear()
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Address_v_FQDN_to_v_IP_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Add_Fqdn_to_IP_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
val doc = e_FQDN_Address_IP_Map.value.getOrDefault(fqdn+"-"+ip, null)
if (doc != null) {
val document: BaseEdgeDocument = doc
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.setFrom(s"V_FQDN/$fqdn")
document.setTo(s"V_IP/$ip")
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(fqdn+"-"+ip)
baseDocument.setFrom(s"V_FQDN/$fqdn")
baseDocument.setTo(s"V_IP/$ip")
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
})
Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Add_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
e_FQDN_Address_IP_Map.destroy()
/**
* 统计e_Visit_v_IP_to_v_FQDN
*/
val e_IP_Visit_FQDN_Query = "FOR doc IN E_VISIT_V_IP_TO_V_FQDN RETURN doc"
val e_IP_Visit_FQDN_Cursor: ArangoCursor[BaseEdgeDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
.query(e_IP_Visit_FQDN_Query, bindVars, options, classOf[BaseEdgeDocument])
while (e_IP_Visit_FQDN_Cursor.hasNext){
val document = e_IP_Visit_FQDN_Cursor.next()
e_IP_Visit_FQDN_Mutabal_Map.put(document.getKey ,document)
}
val e_IP_Visit_FQDN_Map = spark.sparkContext.broadcast(e_IP_Visit_FQDN_Mutabal_Map)
e_IP_Visit_FQDN_Mutabal_Map.clear()
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Visit_Fqdn_to_IP_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_VISIT_V_IP_TO_V_FQDN")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
val doc = e_IP_Visit_FQDN_Map.value.getOrDefault(ip+"-"+fqdn, null)
if (doc != null) {
val document: BaseEdgeDocument = doc
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(ip+"-"+fqdn)
baseDocument.setFrom("V_IP/"+ip)
baseDocument.setTo("V_FQDN/"+fqdn)
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
})
Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Visit_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
e_IP_Visit_FQDN_Map.destroy()
arangoDB.shutdown()
spark.close()
}
}

View File

@@ -0,0 +1,40 @@
package cn.ac.iie.test
import cn.ac.iie.dao.{BaseMediaDataLoad, UpdateArangoGraphByDF}
import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
object ArangoDbTestMemoryGroupBy {
private val logger: Logger = LoggerFactory.getLogger(ArangoDbTestMemoryGroupBy.getClass)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName(ConfigUtils.SPARK_APP_NAME)
.config("spark.serializer", ConfigUtils.SPARK_SERIALIZER)
.config("spark.network.timeout", ConfigUtils.SPARK_NETWORK_TIMEOUT)
.config("spark.sql.shuffle.partitions", ConfigUtils.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", ConfigUtils.SPARK_EXECUTOR_MEMORY)
.master(ConfigUtils.MASTER)
.getOrCreate()
logger.warn("sparkession获取成功")
BaseMediaDataLoad.loadMediaDate(spark)
val v_FQDN_DF = BaseMediaDataLoad.getFQDNVertexFromMedia(spark)
val v_IP_DF = BaseMediaDataLoad.getIPVertexFromMedia(spark)
val e_Address_v_FQDN_to_v_IP_DF = BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark)
val e_Visit_v_IP_to_v_FQDN_DF= BaseMediaDataLoad.getIPVisitFQDNEdgeFromMedia(spark)
UpdateArangoGraphByDF.updateFQDNVertex(v_FQDN_DF,spark)
UpdateArangoGraphByDF.updateIPVertex(v_IP_DF,spark)
UpdateArangoGraphByDF.updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF,spark)
UpdateArangoGraphByDF.updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF,spark)
InitArangoDBPool.arangoDB.shutdown()
spark.close()
}
}

View File

@@ -0,0 +1,22 @@
package cn.ac.iie.test
import com.typesafe.config.{Config, ConfigFactory}
object Config {
private lazy val config: Config = ConfigFactory.load()
val SPARK_SQL_SHUFFLE_PARTITIONS: String = config.getString("spark.sql.shuffle.partitions")
val SPARK_SQL_READ_FETCHSIZE: String = config.getString("spark.sql.read.fetchsize")
val SPARK_EXECUTOR_MEMORY: String = config.getString("spark.executor.memory")
val NUMPARTITIONS: String = config.getString("numPartitions")
val MASTER: String = config.getString("master")
val MAXPOOLSIZE: Int = config.getInt("maxPoolSize")
val MINTIME: String = config.getString("minTime")
val MAXTIME: String = config.getString("maxTime")
val REPARTITION_NUMBER: Int = config.getInt("repartitionNumber")
val ARANGODB_BATCH: Int = config.getInt("arangoDB.batch")
val ARANGODB_TTL: Int = config.getInt("arangoDB.ttl")
val CLICKHOUSE_SOCKET_TIMEOUT: Int = config.getInt("clickhouse.socket.timeout")
val SPARK_SERIALIZER: String = config.getString("spark.serializer")
}

View File

@@ -0,0 +1,447 @@
package cn.ac.iie.test
import com.orientechnologies.orient.core.db.{ODatabasePool, OPartitionedDatabasePool}
import com.orientechnologies.orient.core.sql.OCommandSQL
import com.tinkerpop.blueprints.impls.orient.{OrientGraph, OrientGraphFactory}
import com.tinkerpop.blueprints.{Direction, Edge, Vertex}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.util.Try
object ReadClickhouseTest {
private val logger: Logger = LoggerFactory.getLogger(ReadClickhouseTest.getClass)
@transient
var factory: OrientGraphFactory = _
@transient
var pool: ODatabasePool = _
def main(args: Array[String]): Unit = {
// val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// .config("spark.kryo.classesToRegister","com.tinkerpop.blueprints.impls.orient.OrientGraphFactory")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
/*
.config("spark.driver.host", "192.168.41.79")
.config("spark.jars", "D:\\GITREPO\\ip-learning\\target\\ip-learning-1.0-SNAPSHOT-jar-with-dependencies.jar")
.master("spark://192.168.40.119:7077")
*/
.getOrCreate()
logger.warn("sparkession获取成功")
// val sql = "(select * from av_miner.media_expire_patch_local limit 1000)"
val mediaDataFrame: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:clickhouse://192.168.40.193:8123")
// .option("dbtable", "av_miner.media_expire_patch")
// .option("dbtable", "(select * from av_miner.media_expire_patch limit 10)")
// .option("dbtable","(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where media_domain not LIKE '%\\n%')")
.option("dbtable",s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
// .option("dbtable","(select media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region,min(recv_time) as min_recv_time,max(recv_time) as max_recv_time from av_miner.media_expire_patch group by media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region limit 10)")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "default")
.option("password", "111111")
.option("numPartitions", Config.NUMPARTITIONS)
.option("partitionColumn", "recv_time")
.option("lowerBound", Config.MINTIME)
.option("upperBound", Config.MAXTIME)
.option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
.load()
// mediaDataFrame.printSchema()
mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
// val mediaDataGlobalView = spark.sql("select * from global_temp.media_expire_patch limit 10")
// mediaDataGlobalView.show()
val v_FQDN_DF = spark.sql(
"""
|SELECT
| media_domain AS FQDN_NAME,
| MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
| MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
| COUNT( * ) AS FQDN_COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| media_domain != ''
|GROUP BY
| media_domain
""".stripMargin
)
val s_IP_DF = spark.sql(
"""
select
s1_s_ip as IP,
s1_s_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
val d_IP_DF = spark.sql(
"""
select
s1_d_ip as IP,
s1_d_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
import org.apache.spark.sql.functions._
val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("IP","location").agg(
min("IP_FIRST_FOUND_TIME") .as("IP_FIRST_FOUND_TIME"),
max("IP_LAST_FOUND_TIME").as("IP_LAST_FOUND_TIME"),
count("IP").as("IP_COUNT_TOTAL")
)
/*
val v_IP_DF = spark.sql(
"""
|SELECT
| IP,
| location,
| MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
| MAX( recv_time ) AS IP_LAST_FOUND_TIME,
| COUNT( * ) AS IP_COUNT_TOTAL
|FROM
| (
| ( SELECT s1_s_ip AS IP, s1_s_location_region AS location, recv_time FROM global_temp.media_expire_patch ) UNION ALL
| ( SELECT s1_d_ip AS IP, s1_d_location_region AS location, recv_time FROM global_temp.media_expire_patch )
| )
|GROUP BY
| IP,
| location
""".stripMargin)
*/
val e_Address_v_FQDN_to_v_IP_DF = spark.sql(
"""
|SELECT
| media_domain AS V_FQDN,
| s1_d_ip AS V_IP,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( media_domain != '' )
| AND ( s1_d_ip != '' )
|GROUP BY
| s1_d_ip,
| media_domain
""".stripMargin)
val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
"""
|SELECT
| s1_s_ip AS V_IP,
| media_domain AS V_FQDN,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( s1_s_ip != '' )
| AND ( media_domain != '' )
|GROUP BY
| s1_s_ip,
| media_domain
""".stripMargin)
/**
* 获取数据库连接
*/
val uri: String = "remote:192.168.40.127/iplearning-insert"
// val uri: String = "remote:192.168.40.207/iplearing-test"
// val uri: String = "remote:192.168.40.152:2424;192.168.40.151:2424:192.168.40.153:2424/iplearing-test"
val pool = new OPartitionedDatabasePool(uri, "root", "111111", Config.MAXPOOLSIZE, Config.MAXPOOLSIZE)
factory = new OrientGraphFactory(uri, "root", "111111", pool)
factory.setConnectionStrategy("ROUND_ROBIN_CONNECT")
/**
* 更新FQDN点
*/
v_FQDN_DF.printSchema()
v_FQDN_DF.foreach(row => {
val fqdn = row.getAs[String]("FQDN_NAME")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
// val graph = factory.getNoTx
val graph: OrientGraph = factory.getTx
var v_Fqdn_Obj: Vertex = null
import scala.collection.JavaConversions._
if (graph.getVertices("v_FQDN.FQDN_NAME", fqdn).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_FQDN.FQDN_NAME", fqdn)) {
val update_Fqdn_Last = v.getProperty[Long]("LAST_FOUND_TIME")
val update_Fqdn_Cnt = v.getProperty[Long]("FQDN_APPEAR_COUNT")
val sqlComm = new OCommandSQL(
s"UPDATE v_FQDN SET LAST_FOUND_TIME = $update_Fqdn_Last,FQDN_APPEAR_COUNT = ${update_Fqdn_Cnt + v_Fqdn_Cnt} WHERE FQDN_NAME == '$fqdn'")
Try(graph.command(sqlComm).execute())
println("update fqdn:"+fqdn)
v_Fqdn_Obj = v
}
} else {
v_Fqdn_Obj = graph.addVertex("class:v_FQDN", Nil: _*)
v_Fqdn_Obj.setProperty("FQDN_NAME", fqdn)
v_Fqdn_Obj.setProperty("FIRST_FOUND_TIME", v_Fqdn_First)
v_Fqdn_Obj.setProperty("LAST_FOUND_TIME", v_Fqdn_Last)
v_Fqdn_Obj.setProperty("FQDN_APPEAR_COUNT", v_Fqdn_Cnt)
println("insert fqdn:"+fqdn)
}
var i = 0
i = i+1
if (i == 10000){
graph.commit()
}
})
factory.getTx.commit()
/**
* 更新IP点
*/
v_IP_DF.printSchema()
v_IP_DF.foreach(row => {
val ip = row.getAs[String]("IP")
val location = row.getAs[String]("location")
val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
// val graph = factory.getNoTx
val graph = factory.getTx
var v_IP_Obj: Vertex = null
import scala.collection.JavaConversions._
if (graph.getVertices("v_IP.IP", ip).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
val update_IP_Last = v.getProperty[Long]("LAST_FOUND_TIME")
val update_IP_Cnt = v.getProperty[Long]("IP_APPEAR_COUNT")
val sqlComm = new OCommandSQL(
s"UPDATE v_IP SET LAST_FOUND_TIME = $update_IP_Last,FQDN_APPEAR_COUNT = ${update_IP_Cnt + v_IP_Cnt} "
+ s"WHERE IP == '$ip'")
Try(graph.command(sqlComm).execute())
println("update ip:"+ip)
v_IP_Obj = v
}
} else {
v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
v_IP_Obj.setProperty("IP", ip)
v_IP_Obj.setProperty("IP_LOCATION", location)
v_IP_Obj.setProperty("FIRST_FOUND_TIME", v_IP_First)
v_IP_Obj.setProperty("LAST_FOUND_TIME", v_IP_Last)
v_IP_Obj.setProperty("IP_APPEAR_COUNT", v_IP_Cnt)
println("insert ip:"+ip)
}
var i = 0
i = i+1
if (i == 10000){
graph.commit()
}
})
factory.getTx.commit()
/**
* 统计e_Address_Fqdn_to_IP
*/
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Address_v_FQDN_to_v_IP_DF.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
val graph = factory.getNoTx
var v_Fqdn_Obj: Vertex = null
var v_IP_Obj: Vertex = null
var e_Edge_Obj:Edge = null
import scala.collection.JavaConversions._
//获取fqdn点
if (graph.getVertices("v_FQDN.FQDN_NAME", fqdn).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_FQDN.FQDN_NAME", fqdn)) {
v_Fqdn_Obj = v
}
} else {
v_Fqdn_Obj = graph.addVertex("class:v_FQDN", Nil: _*)
v_Fqdn_Obj.setProperty("FQDN_NAME", fqdn)
v_Fqdn_Obj.setProperty("FIRST_FOUND_TIME", 0)
v_Fqdn_Obj.setProperty("LAST_FOUND_TIME", 0)
v_Fqdn_Obj.setProperty("FQDN_APPEAR_COUNT", 0)
}
//获取IP点
if (graph.getVertices("v_IP.IP", ip).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
v_IP_Obj = v
}
} else {
v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
v_IP_Obj.setProperty("IP", ip)
v_IP_Obj.setProperty("IP_LOCATION", "")
v_IP_Obj.setProperty("FIRST_FOUND_TIME", 0)
v_IP_Obj.setProperty("LAST_FOUND_TIME", 0)
v_IP_Obj.setProperty("IP_APPEAR_COUNT", 0)
}
// println("e_address_egde:"+v_Fqdn_Obj.getProperty[String]("FQDN_NAME")+"-"+v_IP_Obj.getProperty[String]("IP"))
//添加或更新边
for (e: Edge <- v_Fqdn_Obj.getEdges(Direction.OUT)) {
if (e.getVertex(Direction.IN).getProperty[String]("IP") == ip){
val cnt = e.getProperty[Long]("COUNT_TOTAL")
e.setProperty("COUNT_TOTAL",e_Cnt+cnt)
e.setProperty("LAST_FOUND_TIME",e_Last)
println("update e_address_egde:"+fqdn+"-"+ip)
e_Edge_Obj = e
}
}
if (e_Edge_Obj != null){
val newEdge = graph.addEdge(null, v_Fqdn_Obj, v_IP_Obj, "E_ADDRESS_V_FQDN_TO_V_IP")
newEdge.setProperty("COUNT_TOTAL",e_Cnt)
newEdge.setProperty("FIRST_FOUND_TIME",e_First)
newEdge.setProperty("LAST_FOUND_TIME",e_Last)
println("insert e_address_egde:"+fqdn+"-"+ip)
}
})
/**
* 统计e_Visit_v_IP_to_v_FQDN
*/
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_DF.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
val graph = factory.getNoTx
var v_Fqdn_Obj: Vertex = null
var v_IP_Obj: Vertex = null
var e_Edge_Obj:Edge = null
import scala.collection.JavaConversions._
//添加或更新fqdn点
if (graph.getVertices("v_FQDN.FQDN_NAME", fqdn).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_FQDN.FQDN_NAME", fqdn)) {
v_Fqdn_Obj = v
}
} else {
v_Fqdn_Obj = graph.addVertex("class:v_FQDN", Nil: _*)
v_Fqdn_Obj.setProperty("IP", ip)
v_Fqdn_Obj.setProperty("FIRST_FOUND_TIME", 0)
v_Fqdn_Obj.setProperty("LAST_FOUND_TIME", 0)
v_Fqdn_Obj.setProperty("FQDN_APPEAR_COUNT", 0)
}
//添加或更新IP点
if (graph.getVertices("v_IP.IP", ip).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
v_IP_Obj = v
}
} else {
v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
v_IP_Obj.setProperty("FQDN_NAME", fqdn)
v_IP_Obj.setProperty("IP_LOCATION", "")
v_IP_Obj.setProperty("FIRST_FOUND_TIME", 0)
v_IP_Obj.setProperty("LAST_FOUND_TIME", 0)
v_IP_Obj.setProperty("IP_APPEAR_COUNT", 0)
}
// println("e_visit_egde:"+v_Fqdn_Obj.getProperty[String]("FQDN_NAME")+"-"+v_IP_Obj.getProperty[String]("IP"))
//添加或更新边
for (e: Edge <- v_IP_Obj.getEdges(Direction.OUT)) {
if (e.getVertex(Direction.IN).getProperty[String]("FQDN_NAME") == fqdn){
val cnt = e.getProperty[Long]("COUNT_TOTAL")
e.setProperty("COUNT_TOTAL",e_Cnt+cnt)
e.setProperty("LAST_FOUND_TIME",e_Last)
println("update e_visit_egde:"+fqdn+"-"+ip)
e_Edge_Obj = e
}
}
if (e_Edge_Obj != null){
val newEdge = graph.addEdge(null, v_Fqdn_Obj, v_IP_Obj, "E_VISIT_V_IP_TO_V_FQDN")
newEdge.setProperty("COUNT_TOTAL",e_Cnt)
newEdge.setProperty("FIRST_FOUND_TIME",e_First)
newEdge.setProperty("LAST_FOUND_TIME",e_Last)
println("insert e_visit_egde:"+fqdn+"-"+ip)
}
// graph.commit()
})
/*
v_FQDN_DF.printSchema()
v_FQDN_DF.coalesce(20).foreach(row => {
val fqdn = row.getAs[String](0)
val first = row.getAs[Long](1)
val last = row.getAs[Long](2)
val count = row.getAs[Long](3)
val session = pool.acquire()
val vertex = session.newVertex("v_FQDN")
vertex.setProperty("FQDN_NAME",fqdn)
vertex.setProperty("FIRST_FOUND_TIME", first)
vertex.setProperty("LAST_FOUND_TIME", last)
vertex.setProperty("FQDN_APPEAR_COUNT", count)
vertex
})
v_IP_DF.printSchema()
v_IP_DF.coalesce(20).foreach(row => {
val ip = row.getAs[String](0)
val first = row.getAs[Long](2)
val last = row.getAs[Long](3)
val count = row.getAs[Long](4)
val tx: OrientGraph = factory.getTx
val vertex = tx.addVertex("class:v_FQDN",Nil: _*)
vertex.setProperties("FQDN_NAME",ip)
vertex.setProperty("FIRST_FOUND_TIME", first)
vertex.setProperty("LAST_FOUND_TIME", last)
vertex.setProperty("FQDN_APPEAR_COUNT", count)
tx.commit()
})
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Address_v_FQDN_to_v_IP_DF.foreach(row => {
val fqdn = row.getAs[String](0)
val ip = row.getAs[String](2)
val first = row.getAs[Long](3)
val last = row.getAs[Long](4)
val count = row.getAs[Long](5)
val session = pool.acquire()
val tx: OrientGraph = factory.getTx
tx.getFeatures.supportsVertexProperties
val vertex: OrientVertex = tx.getVertex()
tx.addEdge(null,vertex,vertex,"")
})
*/
}
}

View File

@@ -0,0 +1,29 @@
package cn.ac.iie.test
import com.arangodb.entity.DocumentField
import com.arangodb.entity.DocumentField.Type
import scala.beans.BeanProperty
class TestBaseEdgeDocument {
@BeanProperty
@DocumentField(Type.FROM)
var from: String=""
@BeanProperty
@DocumentField(Type.TO)
var to: String=""
@BeanProperty
@DocumentField(Type.KEY)
var key: String=""
@BeanProperty
@DocumentField(Type.ID)
var id: String=""
@BeanProperty
var FIRST_FOUND_TIME:Long = 0
@BeanProperty
var LAST_FOUND_TIME:Long = 0
@BeanProperty
var COUNT_TOTAL:Long = 0
}

View File

@@ -0,0 +1,35 @@
package cn.ac.iie.test
import cn.ac.iie.dao.{BaseMediaDataLoad, UpdateArangoGraphByDF}
import cn.ac.iie.utils.InitArangoDBPool
import com.arangodb.entity.BaseEdgeDocument
import com.arangodb.util.MapBuilder
import com.arangodb.{ArangoCursor, ArangoDB}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object TestBaseEdgeDocumentDataFrame {
@transient
var arangoDB: ArangoDB = _
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
.getOrCreate()
BaseMediaDataLoad.loadMediaDate(spark)
val e_Address_v_FQDN_to_v_IP_DF = BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark)
UpdateArangoGraphByDF.updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF,spark)
InitArangoDBPool.arangoDB.shutdown()
spark.close()
}
}

View File

@@ -0,0 +1,219 @@
package cn.ac.iie.test
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import com.arangodb.util.MapBuilder
import com.arangodb.{ArangoCursor, ArangoDB}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Try
object TestIndices {
@transient
var arangoDB: ArangoDB = _
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
.getOrCreate()
val mediaDataFrame: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:clickhouse://192.168.40.193:8123")
.option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "default")
.option("password", "111111")
.option("numPartitions", Config.NUMPARTITIONS)
.option("partitionColumn", "recv_time")
.option("lowerBound", Config.MINTIME)
.option("upperBound", Config.MAXTIME)
.option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
.load()
mediaDataFrame.printSchema()
mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
val v_FQDN_DF = spark.sql(
"""
|SELECT
| media_domain AS FQDN_NAME,
| MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
| MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
| COUNT( * ) AS FQDN_COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| media_domain != ''
|GROUP BY
| media_domain
""".stripMargin
)
val time1 = System.currentTimeMillis()
arangoDB = new ArangoDB.Builder()
.maxConnections(Config.MAXPOOLSIZE)
.host("192.168.40.127", 8529)
.user("root")
.password("111111")
.build
val dbName = "insert_iplearn_index"
val collectionName = "V_FQDN"
val query = "FOR doc IN " + collectionName + " RETURN doc"
val bindVars = new MapBuilder().get
val cursor: ArangoCursor[BaseEdgeDocument] = arangoDB.db(dbName).query(query, bindVars, null, classOf[BaseEdgeDocument])
var cursor_Map = scala.collection.mutable.HashMap[String,BaseEdgeDocument]()
while (cursor.hasNext){
val document = cursor.next()
cursor_Map += (document.getKey -> document)
}
val time2 = System.currentTimeMillis()
println((time2 - time1)/1000)
val docs_Insert = new java.util.ArrayList[BaseDocument]()
val docs_Update = new java.util.ArrayList[BaseDocument]()
v_FQDN_DF.foreach(row => {
val fqdn = row.getAs[String]("FQDN_NAME")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
val doc = cursor_Map.getOrElse(fqdn, null)
if (doc != null) {
val document: BaseDocument = doc
val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
document.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(fqdn)
baseDocument.addAttribute("FQDN_NAME", fqdn)
baseDocument.addAttribute("FQDN_FIRST_FOUND_TIME", v_Fqdn_First)
baseDocument.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
docs_Insert.add(baseDocument)
}
})
// Try(v_FQDN_Coll.importDocuments(docs_Insert))
// Try(v_FQDN_Coll.updateDocuments(docs_Update))
/*
val db = arangoDB.db("insert_iplearn_index")
val coll = db.collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs = new java.util.ArrayList[BaseEdgeDocument]
val baseEdgeDocument2 = new BaseEdgeDocument
baseEdgeDocument2.setKey("test_edge_2.com")
baseEdgeDocument2.setFrom("V_FQDN/test_edge_2_from")
baseEdgeDocument2.setTo("V_IP/test_edge_2_to")
baseEdgeDocument2.addAttribute("e_add_test_str", "1Two3")
baseEdgeDocument2.addAttribute("e_add_test_num", 4321)
docs.add(baseEdgeDocument2)
coll.importDocuments(docs)
arangoDB.shutdown()
*/
/*
val uri: String = "remote:192.168.40.127/iplearning-insert"
val pool = new OPartitionedDatabasePool(uri, "root", "111111", 5, 5)
factory = new OrientGraphFactory(uri, "root", "111111", pool)
val graph = factory.getNoTx
val ip = "23.224.224.163"
import scala.collection.JavaConversions._
/*
for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
val update_IP_Last = v.getProperty[Long]("LAST_FOUND_TIME")
val update_IP_Cnt = v.getProperty[Long]("IP_APPEAR_COUNT")
val sqlComm = new OCommandSQL(
s"UPDATE v_IP SET LAST_FOUND_TIME = $update_IP_Last,FQDN_APPEAR_COUNT = 100 "
+ s"WHERE IP == '$ip'")
Try(graph.command(sqlComm).execute())
println("update ip:" + ip)
}
*/
val v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
v_IP_Obj.setProperty("IP", ip)
v_IP_Obj.setProperty("IP_LOCATION", "fas")
v_IP_Obj.setProperty("FIRST_FOUND_TIME", 1)
v_IP_Obj.setProperty("LAST_FOUND_TIME", 1)
v_IP_Obj.setProperty("IP_APPEAR_COUNT", 1)
*/
/*
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.config("arangodb.hosts", "192.168.40.127:8529")
.config("arangodb.user", "root")
.config("arangodb.password", "111111")
.master(Config.MASTER)
.getOrCreate()
val value: ArangoRDD[BaseDocument] = ArangoSpark
.load[BaseDocument](spark.sparkContext,"V_FQDN",ReadOptions("insert_iplearn_index"))
// var stringToDocument: Map[String, BaseDocument] = Map[String,BaseDocument]()
val lstBuffer: ListBuffer[(String, BaseDocument)] = ListBuffer[(String, BaseDocument)]()
val map: Map[String, BaseDocument] = value.map(doc => (doc.getKey,doc)).collect().toMap
println(map.size)
spark.close()
*/
/*
arangoDB = new ArangoDB.Builder()
.maxConnections(10)
.host("192.168.40.127", 8529)
.user("root")
.password("111111")
.build
val db = arangoDB.db("insert_iplearn_index")
// db.createCollection("V_FQDN")
// db.createCollection("V_IP")
// db.createCollection("E_ADDRESS_V_FQDN_TO_V_IP")
// db.createCollection("E_VISIT_V_IP_TO_V_FQDN")
val v_FQDN_Coll = db.collection("E_VISIT_V_IP_TO_V_FQDN")
*/
// val coll: ArangoCollection = db.collection("V_FQDN")
// val value = coll.getDocument("test1.com",classOf[BaseDocument])
// val str = value.getAttribute("v_fqdn_test_str")
// val num: Int = value.getAttribute("v_fqdn_test_num").toString.toInt
// println(str+"-"+num)
/*
val docs = new util.ArrayList[BaseDocument]
val baseDocument1 = new BaseDocument
baseDocument1.setKey("test1.com")
baseDocument1.addAttribute("v_fqdn_test_str", "one2three")
baseDocument1.addAttribute("v_fqdn_test_num", 1234)
docs.add(baseDocument1)
val baseDocument2 = new BaseDocument
baseDocument2.setKey("test2.com")
baseDocument2.addAttribute("v_fqdn_test_str", "1Two3")
baseDocument2.addAttribute("v_fqdn_test_num", 4321)
docs.add(baseDocument2)
coll.importDocuments(docs)
*/
// arangoDB.shutdown()
}
}

View File

@@ -0,0 +1,56 @@
package cn.ac.iie.test
import cn.ac.iie.dao.BaseMediaDataLoad
import cn.ac.iie.etl.CursorTransform
import cn.ac.iie.pojo.BaseVertexFqdn
import cn.ac.iie.utils.InitArangoDBPool
import com.arangodb.ArangoCursor
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object TestSparkJoin {
private val logger: Logger = LoggerFactory.getLogger(TestSparkJoin.getClass)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", Config.SPARK_SERIALIZER)
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
.getOrCreate()
logger.warn("sparkession获取成功")
BaseMediaDataLoad.loadMediaDate(spark)
// val v_FQDN_DF = BaseMediaDataLoad.getFQDNVertexFromMedia(spark)
BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark).show(10)
// v_FQDN_DF.printSchema()
/*
val arangoDB = InitArangoDBPool.arangoDB
val options = InitArangoDBPool.options
val bindVars = InitArangoDBPool.bindVars
// val v_FQDN_Query = "FOR doc IN V_FQDN limit 100 RETURN doc"
// val v_FQDN_Query = "FOR doc IN V_FQDN RETURN doc"
// val v_FQDN_Cursor: ArangoCursor[BaseVertexFqdn] = arangoDB.db("insert_iplearn_noindex")
// .query(v_FQDN_Query, bindVars, options, classOf[BaseVertexFqdn])
// val v_FQDN_Curson_DF: DataFrame = spark.createDataFrame(v_FQDN_Cursor.asListRemaining(),classOf[BaseVertexFqdn])
// v_FQDN_Curson_DF.printSchema()
//
val v_FQDN_Curson_DF = CursorTransform.cursorToDataFrame("V_FQDN",classOf[BaseVertexFqdn],spark)
val v_Fqdn_Join_Df = v_FQDN_DF.join(v_FQDN_Curson_DF,v_FQDN_DF("new_fqdn_name")===v_FQDN_Curson_DF("key"),"fullouter")
v_Fqdn_Join_Df.printSchema()
v_Fqdn_Join_Df
// .filter(row => row.getAs[String]("new_fqdn_name")!=null)
// .filter(row => row.getAs[String]("new_fqdn_name")!=null)
.show(300)
arangoDB.shutdown()
spark.close()
*/
}
}

View File

@@ -0,0 +1,34 @@
package cn.ac.iie.utils
import com.typesafe.config.{Config, ConfigFactory}
object ConfigUtils {
private lazy val config: Config = ConfigFactory.load()
val SPARK_SQL_SHUFFLE_PARTITIONS: String = config.getString("spark.sql.shuffle.partitions")
val SPARK_SQL_READ_FETCHSIZE: String = config.getString("spark.sql.read.fetchsize")
val SPARK_EXECUTOR_MEMORY: String = config.getString("spark.executor.memory")
val SPARK_APP_NAME: String = config.getString("spark.app.name")
val SPARK_NETWORK_TIMEOUT: String = config.getString("spark.network.timeout")
val REPARTITION_NUMBER: Int = config.getInt("repartitionNumber")
val SPARK_SERIALIZER: String = config.getString("spark.serializer")
val NUMPARTITIONS: String = config.getString("numPartitions")
val MASTER: String = config.getString("master")
val MAXPOOLSIZE: Int = config.getInt("maxPoolSize")
val MINTIME: String = config.getString("minTime")
val MAXTIME: String = config.getString("maxTime")
val ARANGODB_HOST: String= config.getString("arangoDB.host")
val ARANGODB_PORT: Int = config.getInt("arangoDB.port")
val ARANGODB_USER: String= config.getString("arangoDB.user")
val ARANGODB_PASSWORD:String= config.getString("arangoDB.password")
val ARANGODB_BATCH: Int = config.getInt("arangoDB.batch")
val ARANGODB_DB_NAME:String= config.getString("arangoDB.DB.name")
val ARANGODB_TTL: Int = config.getInt("arangoDB.ttl")
val CLICKHOUSE_SOCKET_TIMEOUT: Int = config.getInt("clickhouse.socket.timeout")
val THREAD_POOL_NUMBER: Int = config.getInt("thread.pool.number")
}

View File

@@ -0,0 +1,5 @@
package cn.ac.iie.utils
object DateTimeUtils {
}

View File

@@ -0,0 +1,24 @@
package cn.ac.iie.utils
import java.util
import com.arangodb.ArangoDB
import com.arangodb.model.AqlQueryOptions
import com.arangodb.util.MapBuilder
object InitArangoDBPool {
@transient
lazy val arangoDB: ArangoDB = new ArangoDB.Builder()
.maxConnections(ConfigUtils.MAXPOOLSIZE)
.host(ConfigUtils.ARANGODB_HOST, ConfigUtils.ARANGODB_PORT)
.user(ConfigUtils.ARANGODB_USER)
.password(ConfigUtils.ARANGODB_PASSWORD)
.build
val bindVars: util.Map[String, AnyRef] = new MapBuilder().get
val options: AqlQueryOptions = new AqlQueryOptions()
.ttl(ConfigUtils.ARANGODB_TTL)
}