IP Learning spark中心测试版本
This commit is contained in:
@@ -67,7 +67,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
|
||||
switch (table) {
|
||||
case "R_LOCATE_FQDN2IP":
|
||||
updateProtocolDocument(doc);
|
||||
deleteDistinctClientIpByTime(doc);
|
||||
// deleteDistinctClientIpByTime(doc);
|
||||
break;
|
||||
case "R_VISIT_IP2FQDN":
|
||||
updateProtocolDocument(doc);
|
||||
|
||||
@@ -156,7 +156,6 @@ object BaseClickhouseData {
|
||||
| MAX(common_recv_time) AS LAST_FOUND_TIME,
|
||||
| MIN(common_recv_time) AS FIRST_FOUND_TIME,
|
||||
| COUNT(*) AS COUNT_TOTAL,
|
||||
| collect_set(common_client_ip) AS DIST_CIP_RECENT,
|
||||
| 'TLS' AS schema_type
|
||||
|FROM
|
||||
| global_temp.dbtable
|
||||
@@ -174,7 +173,6 @@ object BaseClickhouseData {
|
||||
| MAX(common_recv_time) AS LAST_FOUND_TIME,
|
||||
| MIN(common_recv_time) AS FIRST_FOUND_TIME,
|
||||
| COUNT(*) AS COUNT_TOTAL,
|
||||
| collect_set(common_client_ip) AS DIST_CIP_RECENT,
|
||||
| 'HTTP' AS schema_type
|
||||
|FROM
|
||||
| global_temp.dbtable
|
||||
|
||||
@@ -41,8 +41,7 @@ object MergeDataFrame {
|
||||
min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"),
|
||||
max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"),
|
||||
collect_list("COUNT_TOTAL").alias("COUNT_TOTAL_LIST"),
|
||||
collect_list("schema_type").alias("schema_type_list"),
|
||||
collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT")
|
||||
collect_list("schema_type").alias("schema_type_list")
|
||||
)
|
||||
frame.rdd.map(row => {
|
||||
val fqdn = row.getAs[String]("FQDN")
|
||||
|
||||
@@ -133,17 +133,14 @@ object UpdateDocument {
|
||||
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
|
||||
val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
|
||||
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
|
||||
val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
|
||||
|
||||
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
|
||||
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
|
||||
|
||||
val key = fqdn.concat("-" + serverIp)
|
||||
var document = dictionaryMap.getOrDefault(key, null)
|
||||
if (document != null) {
|
||||
updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
|
||||
updateProtocolAttritube(document, sepAttritubeMap)
|
||||
updateDistinctIp(document, distinctIp)
|
||||
} else {
|
||||
document = new BaseEdgeDocument()
|
||||
document.setKey(key)
|
||||
@@ -152,7 +149,6 @@ object UpdateDocument {
|
||||
document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
|
||||
document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
|
||||
putProtocolAttritube(document, sepAttritubeMap)
|
||||
putDistinctIp(document, distinctIp)
|
||||
}
|
||||
document
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user