diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index b568e52..4cdbc14 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -40,18 +40,18 @@ object BaseClickhouseData { val sql = s""" |(SELECT - | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME + | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME,common_vsys_id AS VSYS_ID |FROM | ((SELECT - | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME + | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME,common_vsys_id AS VSYS_ID | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} - | WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni + | WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_vsys_id | )UNION ALL | (SELECT - | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME + | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME,common_vsys_id AS VSYS_ID | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} - | WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host)) - |GROUP BY FQDN HAVING FQDN != '') as dbtable + | WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_vsys_id)) + |GROUP BY FQDN,VSYS_ID HAVING FQDN != '') as dbtable """.stripMargin LOG.warn(sql) val frame = initClickhouseData(sql) @@ -70,9 +70,10 @@ object BaseClickhouseData { |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM, |groupUniqArray(2)(common_link_info_c2s)[2] as common_link_info, |'client' as ip_type + |,common_vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |where $where - |group by common_client_ip) + |group by common_client_ip,common_vsys_id) |UNION ALL |(SELECT common_server_ip AS IP, |MIN(common_recv_time) AS FIRST_FOUND_TIME, @@ -81,9 +82,10 @@ object BaseClickhouseData { |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM, |groupUniqArray(2)(common_link_info_s2c)[2] as common_link_info, |'server' as ip_type + |,common_vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |where $where - |group by common_server_ip))) as dbtable + |group by common_server_ip,common_vsys_id))) as dbtable """.stripMargin LOG.warn(sql) val frame = initClickhouseData(sql) @@ -98,14 +100,14 @@ object BaseClickhouseData { s""" |(SELECT * FROM |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, - |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type,common_vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} - |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip) + |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip,common_vsys_id) |UNION ALL |(SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, - |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type,common_vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} - |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip)) + |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip,common_vsys_id)) |WHERE FQDN != '') as dbtable """.stripMargin LOG.warn(sql) @@ -127,9 +129,9 @@ object BaseClickhouseData { val sql = s""" |( - |SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME + |SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,common_vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} - |WHERE $where GROUP BY common_subscriber_id,radius_framed_ip + |WHERE $where GROUP BY common_subscriber_id,radius_framed_ip,common_vsys_id |) as dbtable """.stripMargin LOG.warn(sql) @@ -151,8 +153,8 @@ object BaseClickhouseData { val sql = s""" |( - |SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} - |WHERE $where GROUP BY common_subscriber_id + |SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,common_vsys_id AS VSYS_ID FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} + |WHERE $where GROUP BY common_subscriber_id,common_vsys_id |)as dbtable """.stripMargin LOG.warn(sql) @@ -170,13 +172,12 @@ object BaseClickhouseData { | AND radius_framed_ip != '' | AND radius_packet_type = 4 | AND radius_acct_status_type = 1 - | """.stripMargin val sql = s""" |( - |SELECT radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} WHERE $where - |GROUP BY radius_framed_ip + |SELECT radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,common_vsys_id AS VSYS_ID FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} WHERE $where + |GROUP BY radius_framed_ip,common_vsys_id |)as dbtable """.stripMargin LOG.warn(sql) diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index b936697..119ec18 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -29,13 +29,14 @@ object MergeDataFrame { def mergeVertexIp(): RDD[(String, (Option[BaseDocument], Row))] = { val vertexIpDf = BaseClickhouseData.getVertexIpDf - val frame = vertexIpDf.repartition().groupBy("IP").agg( + val frame = vertexIpDf.repartition().groupBy("IP","VSYS_ID").agg( min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"), collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"), collect_list("BYTES_SUM").alias("BYTES_SUM_LIST"), collect_list("ip_type").alias("ip_type_list"), - last("common_link_info").alias("common_link_info") + last("common_link_info").alias("common_link_info"), + last("VSYS_ID").alias("VSYS_ID") ) val ipRddRow = frame.rdd.map(row => { (row.getAs[String]("IP"), row) @@ -48,13 +49,14 @@ object MergeDataFrame { def mergeRelationFqdnLocateIp(): RDD[(String, (Option[BaseEdgeDocument], Row))] = { val frame = BaseClickhouseData.getRelationFqdnLocateIpDf .repartition().filter(row => isDomain(row.getAs[String]("FQDN"))) - .groupBy("FQDN", "common_server_ip") + .groupBy("FQDN", "common_server_ip", "VSYS_ID") .agg( min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"), collect_list("COUNT_TOTAL").alias("COUNT_TOTAL_LIST"), collect_list("schema_type").alias("schema_type_list"), - collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT") + collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT"), + last("VSYS_ID").alias("VSYS_ID") ) val fqdnLocIpRddRow = frame.rdd.map(row => { val fqdn = row.getAs[String]("FQDN") diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index d6b0a51..1a920a3 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -139,10 +139,12 @@ object UpdateDocument { val ip = subidLocIpRow.getAs[String]("radius_framed_ip") val lastFoundTime = subidLocIpRow.getAs[Long]("LAST_FOUND_TIME") val firstFoundTime = subidLocIpRow.getAs[Long]("FIRST_FOUND_TIME") + val vsysId = subidLocIpRow.getAs[Long]("VSYS_ID") val key = subId.concat("-" + ip) if (subidLocIpDoc != null) { updateMaxAttribute(subidLocIpDoc, lastFoundTime, "LAST_FOUND_TIME") + subidLocIpDoc.addAttribute("VSYS_ID", vsysId) } else { subidLocIpDoc = new BaseEdgeDocument() subidLocIpDoc.setKey(key) @@ -152,6 +154,7 @@ object UpdateDocument { subidLocIpDoc.addAttribute("IP", ip) subidLocIpDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime) subidLocIpDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime) + subidLocIpDoc.addAttribute("VSYS_ID", vsysId) } } subidLocIpDoc @@ -168,15 +171,18 @@ object UpdateDocument { val subId = subidRow.getAs[String]("common_subscriber_id") val subLastFoundTime = subidRow.getAs[Long]("LAST_FOUND_TIME") val subFirstFoundTime = subidRow.getAs[Long]("FIRST_FOUND_TIME") + val vsysId = subidRow.getAs[Long]("VSYS_ID") if (subidDoc != null) { updateMaxAttribute(subidDoc, subLastFoundTime, "LAST_FOUND_TIME") + subidDoc.addAttribute("VSYS_ID", vsysId) } else { subidDoc = new BaseDocument() subidDoc.setKey(subId) subidDoc.addAttribute("SUBSCRIBER", subId) subidDoc.addAttribute("FIRST_FOUND_TIME", subFirstFoundTime) subidDoc.addAttribute("LAST_FOUND_TIME", subLastFoundTime) + subidDoc.addAttribute("VSYS_ID", vsysId) } } subidDoc @@ -193,15 +199,18 @@ object UpdateDocument { val fqdn = fqdnRow.getAs[String]("FQDN") val lastFoundTime = fqdnRow.getAs[Long]("LAST_FOUND_TIME") val firstFoundTime = fqdnRow.getAs[Long]("FIRST_FOUND_TIME") + val vsysId = fqdnRow.getAs[Long]("VSYS_ID") if (fqdnDoc != null) { updateMaxAttribute(fqdnDoc, lastFoundTime, "LAST_FOUND_TIME") + fqdnDoc.addAttribute("VSYS_ID", vsysId) } else { fqdnDoc = new BaseDocument fqdnDoc.setKey(fqdn) fqdnDoc.addAttribute("FQDN_NAME", fqdn) fqdnDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime) fqdnDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime) + fqdnDoc.addAttribute("VSYS_ID", vsysId) } } fqdnDoc @@ -223,6 +232,7 @@ object UpdateDocument { val ipTypeList = ipRow.getAs[ofRef[String]]("ip_type_list") val linkInfo = ipRow.getAs[String]("common_link_info") val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList) + val vsysId = ipRow.getAs[Long]("VSYS_ID") if (ipDoc != null) { updateMaxAttribute(ipDoc, lastFoundTime, "LAST_FOUND_TIME") @@ -231,6 +241,7 @@ object UpdateDocument { updateSumAttribute(ipDoc, sepAttributeTuple._3, "CLIENT_SESSION_COUNT") updateSumAttribute(ipDoc, sepAttributeTuple._4, "CLIENT_BYTES_SUM") replaceAttribute(ipDoc, linkInfo, "COMMON_LINK_INFO") + ipDoc.addAttribute("VSYS_ID", vsysId) } else { ipDoc = new BaseDocument ipDoc.setKey(ip) @@ -242,6 +253,7 @@ object UpdateDocument { ipDoc.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3) ipDoc.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4) ipDoc.addAttribute("COMMON_LINK_INFO", "") + ipDoc.addAttribute("VSYS_ID", vsysId) } } ipDoc @@ -266,6 +278,7 @@ object UpdateDocument { val countTotalList = fqdnLocIpRow.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") val schemaTypeList = fqdnLocIpRow.getAs[ofRef[AnyRef]]("schema_type_list") val distCipRecent = fqdnLocIpRow.getAs[ofRef[String]]("DIST_CIP_RECENT") + val vsysId = fqdnLocIpRow.getAs[Long]("VSYS_ID") val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) @@ -274,6 +287,7 @@ object UpdateDocument { updateMaxAttribute(fqdnLocIpDoc, lastFoundTime, "LAST_FOUND_TIME") updateProtocolAttritube(fqdnLocIpDoc, sepAttritubeMap) updateDistinctIp(fqdnLocIpDoc, distinctIp) + fqdnLocIpDoc.addAttribute("VSYS_ID", vsysId) } else { fqdnLocIpDoc = new BaseEdgeDocument() fqdnLocIpDoc.setKey(key) @@ -281,6 +295,8 @@ object UpdateDocument { fqdnLocIpDoc.setTo("IP/" + serverIp) fqdnLocIpDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime) fqdnLocIpDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime) + fqdnLocIpDoc.addAttribute("VSYS_ID", vsysId) + putProtocolAttritube(fqdnLocIpDoc, sepAttritubeMap) putDistinctIp(fqdnLocIpDoc, distinctIp) }