diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index 4cdbc14..ce8b58b 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -150,6 +150,7 @@ object BaseClickhouseData { | AND radius_packet_type = 4 | AND radius_acct_status_type = 1 """.stripMargin + val sql = s""" |( diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index 119ec18..352a650 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -1,7 +1,5 @@ package cn.ac.iie.service.transform -import java.util.regex.Pattern - import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.dao.{BaseArangoData, BaseClickhouseData} import cn.ac.iie.spark.partition.CustomPartitioner @@ -12,15 +10,17 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.functions._ import org.slf4j.LoggerFactory +import java.util.regex.Pattern + object MergeDataFrame { private val LOG = LoggerFactory.getLogger(MergeDataFrame.getClass) private val pattern = Pattern.compile("^[\\d]*$") def mergeVertexFqdn(): RDD[(String, (Option[BaseDocument], Row))] = { val fqdnRddRow: RDD[(String, Row)] = BaseClickhouseData.getVertexFqdnDf - .repartition().rdd.filter(row => isDomain(row.getAs[String](0))).map(row => { + .repartition().rdd.filter(row => isDomain(row.getAs[String](0))).map(row => { (row.getAs[String]("FQDN"), row) - })/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/ + }) /*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/ val fqdnRddDoc: ArangoRdd[BaseDocument] = BaseArangoData.loadArangoRdd[BaseDocument]("FQDN") @@ -29,18 +29,20 @@ object MergeDataFrame { def mergeVertexIp(): RDD[(String, (Option[BaseDocument], Row))] = { val vertexIpDf = BaseClickhouseData.getVertexIpDf - val frame = vertexIpDf.repartition().groupBy("IP","VSYS_ID").agg( + val frame = vertexIpDf.repartition().groupBy("IP", "VSYS_ID").agg( min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"), collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"), collect_list("BYTES_SUM").alias("BYTES_SUM_LIST"), collect_list("ip_type").alias("ip_type_list"), - last("common_link_info").alias("common_link_info"), - last("VSYS_ID").alias("VSYS_ID") + last("common_link_info").alias("common_link_info") ) + val ipRddRow = frame.rdd.map(row => { - (row.getAs[String]("IP"), row) - })/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/ + val vsysId = row.getAs[Long]("VSYS_ID") + val ip = row.getAs[String]("IP") + ( ip + "-" + vsysId, row) + }) /*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/ val ipRddDoc = BaseArangoData.loadArangoRdd[BaseDocument]("IP") ipRddDoc.map(doc => (doc.getKey, doc)).rightOuterJoin(ipRddRow) @@ -55,15 +57,15 @@ object MergeDataFrame { max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"), collect_list("COUNT_TOTAL").alias("COUNT_TOTAL_LIST"), collect_list("schema_type").alias("schema_type_list"), - collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT"), - last("VSYS_ID").alias("VSYS_ID") + collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT") ) val fqdnLocIpRddRow = frame.rdd.map(row => { val fqdn = row.getAs[String]("FQDN") val serverIp = row.getAs[String]("common_server_ip") - val key = fqdn.concat("-" + serverIp) + val vsysId = row.getAs[Long]("VSYS_ID") + val key = fqdn.concat("-" + serverIp + "-" + vsysId) (key, row) - })/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/ + }) /*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/ val fqdnLocIpRddDoc = BaseArangoData.loadArangoRdd[BaseEdgeDocument]("R_LOCATE_FQDN2IP") fqdnLocIpRddDoc.map(doc => (doc.getKey, doc)).rightOuterJoin(fqdnLocIpRddRow) @@ -75,7 +77,8 @@ object MergeDataFrame { .rdd.map(row => { val commonSubscriberId = row.getAs[String]("common_subscriber_id") val ip = row.getAs[String]("radius_framed_ip") - val key = commonSubscriberId.concat("-" + ip) + val vsysId = row.getAs[Long]("VSYS_ID") + val key = commonSubscriberId.concat("-" + ip + "-" + vsysId) (key, row) }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)) val subidLocIpRddDoc = BaseArangoData.loadArangoRdd[BaseEdgeDocument]("R_LOCATE_SUBSCRIBER2IP") diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index 1a920a3..d767ca2 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -120,8 +120,9 @@ object UpdateDocument { private def getVertexFrameipRow(row: Row): BaseDocument = { val ip = row.getAs[String]("radius_framed_ip") + val vsysId = row.getAs[Long]("VSYS_ID") val document = new BaseDocument() - document.setKey(ip) + document.setKey(ip + "-" + vsysId) document.addAttribute("IP", ip) document } @@ -141,7 +142,7 @@ object UpdateDocument { val firstFoundTime = subidLocIpRow.getAs[Long]("FIRST_FOUND_TIME") val vsysId = subidLocIpRow.getAs[Long]("VSYS_ID") - val key = subId.concat("-" + ip) + val key = subId.concat("-" + ip + "-" + vsysId) if (subidLocIpDoc != null) { updateMaxAttribute(subidLocIpDoc, lastFoundTime, "LAST_FOUND_TIME") subidLocIpDoc.addAttribute("VSYS_ID", vsysId) @@ -172,13 +173,14 @@ object UpdateDocument { val subLastFoundTime = subidRow.getAs[Long]("LAST_FOUND_TIME") val subFirstFoundTime = subidRow.getAs[Long]("FIRST_FOUND_TIME") val vsysId = subidRow.getAs[Long]("VSYS_ID") + val key = subId.concat("-" + vsysId) if (subidDoc != null) { updateMaxAttribute(subidDoc, subLastFoundTime, "LAST_FOUND_TIME") subidDoc.addAttribute("VSYS_ID", vsysId) } else { subidDoc = new BaseDocument() - subidDoc.setKey(subId) + subidDoc.setKey(key) subidDoc.addAttribute("SUBSCRIBER", subId) subidDoc.addAttribute("FIRST_FOUND_TIME", subFirstFoundTime) subidDoc.addAttribute("LAST_FOUND_TIME", subLastFoundTime) @@ -206,7 +208,7 @@ object UpdateDocument { fqdnDoc.addAttribute("VSYS_ID", vsysId) } else { fqdnDoc = new BaseDocument - fqdnDoc.setKey(fqdn) + fqdnDoc.setKey(fqdn + "-" + vsysId) fqdnDoc.addAttribute("FQDN_NAME", fqdn) fqdnDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime) fqdnDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime) @@ -244,7 +246,7 @@ object UpdateDocument { ipDoc.addAttribute("VSYS_ID", vsysId) } else { ipDoc = new BaseDocument - ipDoc.setKey(ip) + ipDoc.setKey(ip + "-" + vsysId) ipDoc.addAttribute("IP", ip) ipDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime) ipDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime) @@ -282,7 +284,7 @@ object UpdateDocument { val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) - val key = fqdn.concat("-" + serverIp) + val key = fqdn.concat("-" + serverIp + "-" + vsysId) if (fqdnLocIpDoc != null) { updateMaxAttribute(fqdnLocIpDoc, lastFoundTime, "LAST_FOUND_TIME") updateProtocolAttritube(fqdnLocIpDoc, sepAttritubeMap)