diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index 1ca66d7..69ceda8 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -64,16 +64,6 @@ public class ReadHistoryArangoData extends Thread { int i = 0; for (T doc : baseDocuments) { String key = doc.getKey(); - switch (table) { - case "R_LOCATE_FQDN2IP": - updateProtocolDocument(doc); - deleteDistinctClientIpByTime(doc); - break; - case "R_VISIT_IP2FQDN": - updateProtocolDocument(doc); - break; - default: - } int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER(); ConcurrentHashMap tmpMap = map.get(hashCode); tmpMap.put(key, doc); diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/TopDomainUtils.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/TopDomainUtils.java new file mode 100644 index 0000000..55201db --- /dev/null +++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/TopDomainUtils.java @@ -0,0 +1,25 @@ +package cn.ac.iie.utils; + +public class TopDomainUtils { + + /** + * 通用方法,传入url,返回domain,这里的domain不包含端口号,含有:一定是v6 + * @param oriUrl + * @return + */ + public static String getDomainFromUrl(String oriUrl) { + String url = oriUrl.split("[?]")[0]; + url = url.replaceAll("https://", "").replaceAll("http://", ""); + String domain; + + if (url.split("/")[0].split(":").length <= 2) { + domain = url + .split("/")[0] + .split(":")[0]; + } else { + domain = url.split("/")[0]; + } + return domain; + + } +} diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties index 2ccecef..8116da4 100644 --- a/ip-learning-spark/src/main/resources/application.properties +++ b/ip-learning-spark/src/main/resources/application.properties @@ -7,13 +7,14 @@ repartitionNumber=36 spark.serializer=org.apache.spark.serializer.KryoSerializer master=local[*] #spark读取clickhouse配置 -spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3 +#spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3 +spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.193:8123/tsg_galaxy_zx spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver spark.read.clickhouse.user=default spark.read.clickhouse.password=111111 spark.read.clickhouse.numPartitions=144 spark.read.clickhouse.fetchsize=10000 -spark.read.clickhouse.partitionColumn=common_start_time +spark.read.clickhouse.partitionColumn=recv_time clickhouse.socket.timeout=300000 #arangoDB配置 arangoDB.host=192.168.40.182 @@ -27,9 +28,9 @@ arangoDB.ttl=3600 thread.pool.number=5 #读取clickhouse时间范围方式,0:读取过去一小时;1:指定时间范围 -clickhouse.time.limit.type=0 -read.clickhouse.max.time=1571245220 -read.clickhouse.min.time=1571245210 +clickhouse.time.limit.type=1 +read.clickhouse.max.time=1571241720 +read.clickhouse.min.time=1571241600 #读取arangoDB时间范围方式,0:正常读;1:指定时间范围 arango.time.limit.type=0 diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index c0d6c6b..5ae0b19 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -2,6 +2,7 @@ package cn.ac.iie.dao import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.utils.SparkSessionUtil.spark +import cn.ac.iie.utils.TopDomainUtils import org.apache.spark.sql.DataFrame import org.slf4j.LoggerFactory @@ -31,13 +32,13 @@ object BaseClickhouseData { } def loadConnectionDataFromCk(): Unit ={ - val where = "common_start_time >= " + timeLimit._2 + " AND common_start_time < " + timeLimit._1 + val where = "recv_time >= " + timeLimit._2 + " AND recv_time < " + timeLimit._1 val sql = s""" |(SELECT - | ssl_sni,http_host,common_client_ip,common_server_ip,common_start_time,common_c2s_byte_num,common_s2c_byte_num,common_schema_type + | s1_domain,s1_referer,s1_s_ip,s1_d_ip,recv_time,media_len |FROM - | connection_record_log + | media_expire_patch |WHERE $where) as dbtable """.stripMargin @@ -68,28 +69,31 @@ object BaseClickhouseData { initClickhouseData(sql) } + def getDomain(url:String): String ={ + TopDomainUtils.getDomainFromUrl(url) + } + def getVertexFqdnDf: DataFrame ={ loadConnectionDataFromCk() + spark.udf.register("getDomain",TopDomainUtils.getDomainFromUrl _) val sql = """ |SELECT - | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME + | FQDN,MAX(LAST_FOUND_TIME) AS LAST_FOUND_TIME,MIN(FIRST_FOUND_TIME) AS FIRST_FOUND_TIME |FROM | ( | (SELECT - | ssl_sni AS FQDN,MAX( common_start_time ) AS LAST_FOUND_TIME,MIN( common_start_time ) AS FIRST_FOUND_TIME + | s1_domain AS FQDN,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME | FROM | global_temp.dbtable - | WHERE - | common_schema_type = 'SSL' GROUP BY ssl_sni + | GROUP BY s1_domain | ) | UNION ALL | (SELECT - | http_host AS FQDN,MAX( common_start_time ) AS LAST_FOUND_TIME,MIN( common_start_time ) AS FIRST_FOUND_TIME + | getDomain(s1_referer) AS FQDN,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME | FROM | global_temp.dbtable - | WHERE - | common_schema_type = 'HTTP' GROUP BY http_host + | GROUP BY getDomain(s1_referer) | ) | ) |GROUP BY @@ -103,6 +107,11 @@ object BaseClickhouseData { vertexFqdnDf } + def main(args: Array[String]): Unit = { + val df = getRelationFqdnLocateIpDf + df.show(10) + } + def getVertexIpDf: DataFrame ={ loadConnectionDataFromCk() val sql = @@ -113,11 +122,11 @@ object BaseClickhouseData { | ( | ( | SELECT - | common_client_ip AS IP, - | MIN(common_start_time) AS FIRST_FOUND_TIME, - | MAX(common_start_time) AS LAST_FOUND_TIME, + | s1_s_ip AS IP, + | MIN(recv_time) AS FIRST_FOUND_TIME, + | MAX(recv_time) AS LAST_FOUND_TIME, | count(*) as SESSION_COUNT, - | sum(common_c2s_byte_num) as BYTES_SUM, + | sum(media_len) as BYTES_SUM, | 'client' as ip_type | FROM | global_temp.dbtable @@ -127,11 +136,11 @@ object BaseClickhouseData { | UNION ALL | ( | SELECT - | common_server_ip AS IP, - | MIN(common_start_time) AS FIRST_FOUND_TIME, - | MAX(common_start_time) AS LAST_FOUND_TIME, + | s1_d_ip AS IP, + | MIN(recv_time) AS FIRST_FOUND_TIME, + | MAX(recv_time) AS LAST_FOUND_TIME, | count(*) as SESSION_COUNT, - | sum(common_s2c_byte_num) as BYTES_SUM, + | sum(media_len) as BYTES_SUM, | 'server' as ip_type | FROM | global_temp.dbtable @@ -148,42 +157,23 @@ object BaseClickhouseData { def getRelationFqdnLocateIpDf: DataFrame ={ loadConnectionDataFromCk() - val sslSql = - """ - |SELECT - | ssl_sni AS FQDN, - | common_server_ip, - | MAX(common_start_time) AS LAST_FOUND_TIME, - | MIN(common_start_time) AS FIRST_FOUND_TIME, - | COUNT(*) AS COUNT_TOTAL, - | collect_set(common_client_ip) AS DIST_CIP_RECENT, - | 'TLS' AS schema_type - |FROM - | global_temp.dbtable - |WHERE - | common_schema_type = 'SSL' - |GROUP BY - | ssl_sni,common_server_ip - """.stripMargin - val httpSql = + val sql = """ |SELECT - | http_host AS FQDN, - | common_server_ip, - | MAX(common_start_time) AS LAST_FOUND_TIME, - | MIN(common_start_time) AS FIRST_FOUND_TIME, + | s1_domain AS FQDN, + | s1_d_ip AS common_server_ip, + | MAX(recv_time) AS LAST_FOUND_TIME, + | MIN(recv_time) AS FIRST_FOUND_TIME, | COUNT(*) AS COUNT_TOTAL, - | collect_set(common_client_ip) AS DIST_CIP_RECENT, - | 'HTTP' AS schema_type + | collect_set(s1_s_ip) AS DIST_CIP_RECENT |FROM | global_temp.dbtable |WHERE - | common_schema_type = 'HTTP' + | s1_domain != '' |GROUP BY - | http_host,common_server_ip + | s1_domain,s1_d_ip """.stripMargin - val sql = s"SELECT * FROM (($sslSql) UNION ALL ($httpSql)) WHERE FQDN != ''" LOG.warn(sql) val relationFqdnLocateIpDf = spark.sql(sql) diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index 460caed..7749c4e 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -35,16 +35,9 @@ object MergeDataFrame { } def mergeRelationFqdnLocateIp(): RDD[Row] ={ - val frame = BaseClickhouseData.getRelationFqdnLocateIpDf.filter(row => isDomain(row.getAs[String]("FQDN"))) - .groupBy("FQDN", "common_server_ip") - .agg( - min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), - max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"), - collect_list("COUNT_TOTAL").alias("COUNT_TOTAL_LIST"), - collect_list("schema_type").alias("schema_type_list"), - collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT") - ) - frame.rdd.map(row => { + BaseClickhouseData.getRelationFqdnLocateIpDf + .filter(row => isDomain(row.getAs[String]("FQDN"))) + .rdd.map(row => { val fqdn = row.getAs[String]("FQDN") val serverIp = row.getAs[String]("common_server_ip") val key = fqdn.concat("-"+serverIp) diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala index bdf8120..52a838b 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala @@ -93,8 +93,8 @@ object UpdateDocHandler { doc.addAttribute("PROTOCOL_TYPE",protocolTypeBuilder.toString().replaceFirst(",","")) } - def mergeDistinctIp(distCipRecent:ofRef[ofRef[String]]): Array[String] ={ - distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray + def mergeDistinctIp(distCipRecent:ofRef[String]): Array[String] ={ + distCipRecent.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray } def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={ diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index b7d4875..089cb89 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -131,18 +131,16 @@ object UpdateDocument { val serverIp = row.getAs[String]("common_server_ip") val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") - val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") - val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") - val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") + val countTotal = row.getAs[Long]("COUNT_TOTAL") + val distCipRecent = row.getAs[ofRef[String]]("DIST_CIP_RECENT") - val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) val key = fqdn.concat("-" + serverIp) var document = dictionaryMap.getOrDefault(key, null) if (document != null) { updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") - updateProtocolAttritube(document, sepAttritubeMap) + updateSumAttribute(document,countTotal,"CNT_TOTAL") updateDistinctIp(document, distinctIp) } else { document = new BaseEdgeDocument() @@ -151,7 +149,7 @@ object UpdateDocument { document.setTo("IP/" + serverIp) document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) document.addAttribute("LAST_FOUND_TIME", lastFoundTime) - putProtocolAttritube(document, sepAttritubeMap) + document.addAttribute("CNT_TOTAL",countTotal) putDistinctIp(document, distinctIp) } document