diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index 1ca66d7..4ccd6e1 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -67,7 +67,7 @@ public class ReadHistoryArangoData extends Thread { switch (table) { case "R_LOCATE_FQDN2IP": updateProtocolDocument(doc); - deleteDistinctClientIpByTime(doc); +// deleteDistinctClientIpByTime(doc); break; case "R_VISIT_IP2FQDN": updateProtocolDocument(doc); diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index 952c30c..99496ea 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -156,7 +156,6 @@ object BaseClickhouseData { | MAX(common_recv_time) AS LAST_FOUND_TIME, | MIN(common_recv_time) AS FIRST_FOUND_TIME, | COUNT(*) AS COUNT_TOTAL, - | collect_set(common_client_ip) AS DIST_CIP_RECENT, | 'TLS' AS schema_type |FROM | global_temp.dbtable @@ -174,7 +173,6 @@ object BaseClickhouseData { | MAX(common_recv_time) AS LAST_FOUND_TIME, | MIN(common_recv_time) AS FIRST_FOUND_TIME, | COUNT(*) AS COUNT_TOTAL, - | collect_set(common_client_ip) AS DIST_CIP_RECENT, | 'HTTP' AS schema_type |FROM | global_temp.dbtable diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index 460caed..f94195e 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -41,8 +41,7 @@ object MergeDataFrame { min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"), collect_list("COUNT_TOTAL").alias("COUNT_TOTAL_LIST"), - collect_list("schema_type").alias("schema_type_list"), - collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT") + collect_list("schema_type").alias("schema_type_list") ) frame.rdd.map(row => { val fqdn = row.getAs[String]("FQDN") diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index b7d4875..fda2b00 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -133,17 +133,14 @@ object UpdateDocument { val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") - val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) - val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) val key = fqdn.concat("-" + serverIp) var document = dictionaryMap.getOrDefault(key, null) if (document != null) { updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") updateProtocolAttritube(document, sepAttritubeMap) - updateDistinctIp(document, distinctIp) } else { document = new BaseEdgeDocument() document.setKey(key) @@ -152,7 +149,6 @@ object UpdateDocument { document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) document.addAttribute("LAST_FOUND_TIME", lastFoundTime) putProtocolAttritube(document, sepAttritubeMap) - putDistinctIp(document, distinctIp) } document }