TSG-11902

This commit is contained in:
zhanghongqing
2022-09-20 11:08:54 +08:00
parent 4bc5b75994
commit 1bf1045223
3 changed files with 26 additions and 20 deletions

View File

@@ -150,6 +150,7 @@ object BaseClickhouseData {
| AND radius_packet_type = 4
| AND radius_acct_status_type = 1
""".stripMargin
val sql =
s"""
|(

View File

@@ -1,7 +1,5 @@
package cn.ac.iie.service.transform
import java.util.regex.Pattern
import cn.ac.iie.config.ApplicationConfig
import cn.ac.iie.dao.{BaseArangoData, BaseClickhouseData}
import cn.ac.iie.spark.partition.CustomPartitioner
@@ -12,15 +10,17 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
import org.slf4j.LoggerFactory
import java.util.regex.Pattern
object MergeDataFrame {
private val LOG = LoggerFactory.getLogger(MergeDataFrame.getClass)
private val pattern = Pattern.compile("^[\\d]*$")
def mergeVertexFqdn(): RDD[(String, (Option[BaseDocument], Row))] = {
val fqdnRddRow: RDD[(String, Row)] = BaseClickhouseData.getVertexFqdnDf
.repartition().rdd.filter(row => isDomain(row.getAs[String](0))).map(row => {
.repartition().rdd.filter(row => isDomain(row.getAs[String](0))).map(row => {
(row.getAs[String]("FQDN"), row)
})/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/
}) /*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/
val fqdnRddDoc: ArangoRdd[BaseDocument] = BaseArangoData.loadArangoRdd[BaseDocument]("FQDN")
@@ -29,18 +29,20 @@ object MergeDataFrame {
def mergeVertexIp(): RDD[(String, (Option[BaseDocument], Row))] = {
val vertexIpDf = BaseClickhouseData.getVertexIpDf
val frame = vertexIpDf.repartition().groupBy("IP","VSYS_ID").agg(
val frame = vertexIpDf.repartition().groupBy("IP", "VSYS_ID").agg(
min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"),
max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"),
collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"),
collect_list("BYTES_SUM").alias("BYTES_SUM_LIST"),
collect_list("ip_type").alias("ip_type_list"),
last("common_link_info").alias("common_link_info"),
last("VSYS_ID").alias("VSYS_ID")
last("common_link_info").alias("common_link_info")
)
val ipRddRow = frame.rdd.map(row => {
(row.getAs[String]("IP"), row)
})/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/
val vsysId = row.getAs[Long]("VSYS_ID")
val ip = row.getAs[String]("IP")
( ip + "-" + vsysId, row)
}) /*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/
val ipRddDoc = BaseArangoData.loadArangoRdd[BaseDocument]("IP")
ipRddDoc.map(doc => (doc.getKey, doc)).rightOuterJoin(ipRddRow)
@@ -55,15 +57,15 @@ object MergeDataFrame {
max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"),
collect_list("COUNT_TOTAL").alias("COUNT_TOTAL_LIST"),
collect_list("schema_type").alias("schema_type_list"),
collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT"),
last("VSYS_ID").alias("VSYS_ID")
collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT")
)
val fqdnLocIpRddRow = frame.rdd.map(row => {
val fqdn = row.getAs[String]("FQDN")
val serverIp = row.getAs[String]("common_server_ip")
val key = fqdn.concat("-" + serverIp)
val vsysId = row.getAs[Long]("VSYS_ID")
val key = fqdn.concat("-" + serverIp + "-" + vsysId)
(key, row)
})/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/
}) /*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/
val fqdnLocIpRddDoc = BaseArangoData.loadArangoRdd[BaseEdgeDocument]("R_LOCATE_FQDN2IP")
fqdnLocIpRddDoc.map(doc => (doc.getKey, doc)).rightOuterJoin(fqdnLocIpRddRow)
@@ -75,7 +77,8 @@ object MergeDataFrame {
.rdd.map(row => {
val commonSubscriberId = row.getAs[String]("common_subscriber_id")
val ip = row.getAs[String]("radius_framed_ip")
val key = commonSubscriberId.concat("-" + ip)
val vsysId = row.getAs[Long]("VSYS_ID")
val key = commonSubscriberId.concat("-" + ip + "-" + vsysId)
(key, row)
}).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
val subidLocIpRddDoc = BaseArangoData.loadArangoRdd[BaseEdgeDocument]("R_LOCATE_SUBSCRIBER2IP")

View File

@@ -120,8 +120,9 @@ object UpdateDocument {
private def getVertexFrameipRow(row: Row): BaseDocument = {
val ip = row.getAs[String]("radius_framed_ip")
val vsysId = row.getAs[Long]("VSYS_ID")
val document = new BaseDocument()
document.setKey(ip)
document.setKey(ip + "-" + vsysId)
document.addAttribute("IP", ip)
document
}
@@ -141,7 +142,7 @@ object UpdateDocument {
val firstFoundTime = subidLocIpRow.getAs[Long]("FIRST_FOUND_TIME")
val vsysId = subidLocIpRow.getAs[Long]("VSYS_ID")
val key = subId.concat("-" + ip)
val key = subId.concat("-" + ip + "-" + vsysId)
if (subidLocIpDoc != null) {
updateMaxAttribute(subidLocIpDoc, lastFoundTime, "LAST_FOUND_TIME")
subidLocIpDoc.addAttribute("VSYS_ID", vsysId)
@@ -172,13 +173,14 @@ object UpdateDocument {
val subLastFoundTime = subidRow.getAs[Long]("LAST_FOUND_TIME")
val subFirstFoundTime = subidRow.getAs[Long]("FIRST_FOUND_TIME")
val vsysId = subidRow.getAs[Long]("VSYS_ID")
val key = subId.concat("-" + vsysId)
if (subidDoc != null) {
updateMaxAttribute(subidDoc, subLastFoundTime, "LAST_FOUND_TIME")
subidDoc.addAttribute("VSYS_ID", vsysId)
} else {
subidDoc = new BaseDocument()
subidDoc.setKey(subId)
subidDoc.setKey(key)
subidDoc.addAttribute("SUBSCRIBER", subId)
subidDoc.addAttribute("FIRST_FOUND_TIME", subFirstFoundTime)
subidDoc.addAttribute("LAST_FOUND_TIME", subLastFoundTime)
@@ -206,7 +208,7 @@ object UpdateDocument {
fqdnDoc.addAttribute("VSYS_ID", vsysId)
} else {
fqdnDoc = new BaseDocument
fqdnDoc.setKey(fqdn)
fqdnDoc.setKey(fqdn + "-" + vsysId)
fqdnDoc.addAttribute("FQDN_NAME", fqdn)
fqdnDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
fqdnDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime)
@@ -244,7 +246,7 @@ object UpdateDocument {
ipDoc.addAttribute("VSYS_ID", vsysId)
} else {
ipDoc = new BaseDocument
ipDoc.setKey(ip)
ipDoc.setKey(ip + "-" + vsysId)
ipDoc.addAttribute("IP", ip)
ipDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
ipDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime)
@@ -282,7 +284,7 @@ object UpdateDocument {
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
val key = fqdn.concat("-" + serverIp)
val key = fqdn.concat("-" + serverIp + "-" + vsysId)
if (fqdnLocIpDoc != null) {
updateMaxAttribute(fqdnLocIpDoc, lastFoundTime, "LAST_FOUND_TIME")
updateProtocolAttritube(fqdnLocIpDoc, sepAttritubeMap)