This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
wanglihui-ip-learning-graph/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala

218 lines
8.1 KiB
Scala
Raw Normal View History

2020-08-06 16:11:16 +08:00
package cn.ac.iie.dao
import java.util.Date
import cn.ac.iie.config.ApplicationConfig
import cn.ac.iie.utils.SparkSessionUtil.spark
import com.zdjizhi.utils.DateUtils
import org.apache.spark.sql.DataFrame
import org.slf4j.LoggerFactory
2020-08-06 16:11:16 +08:00
object BaseClickhouseData {
private val LOG = LoggerFactory.getLogger(BaseClickhouseData.getClass)
val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60
private val timeLimit: (Long, Long) = getTimeLimit
2021-03-23 11:26:55 +08:00
private def initClickhouseData(sql: String): DataFrame = {
val dataFrame: DataFrame = spark.read.format("jdbc")
.option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL)
.option("dbtable", sql)
.option("driver", ApplicationConfig.SPARK_READ_CLICKHOUSE_DRIVER)
.option("user", ApplicationConfig.SPARK_READ_CLICKHOUSE_USER)
.option("password", ApplicationConfig.SPARK_READ_CLICKHOUSE_PASSWORD)
.option("numPartitions", ApplicationConfig.NUMPARTITIONS)
.option("partitionColumn", ApplicationConfig.SPARK_READ_CLICKHOUSE_PARTITIONCOLUMN)
.option("lowerBound", timeLimit._2)
.option("upperBound", timeLimit._1)
.option("fetchsize", ApplicationConfig.SPARK_READ_CLICKHOUSE_FETCHSIZE)
2021-03-23 11:26:55 +08:00
.option("socket_timeout", ApplicationConfig.CLICKHOUSE_SOCKET_TIMEOUT)
.load()
dataFrame.printSchema()
dataFrame.createOrReplaceGlobalTempView("dbtable")
2020-11-10 16:59:39 +08:00
dataFrame
}
2021-03-23 11:26:55 +08:00
def getVertexFqdnDf: DataFrame = {
val where = "recv_time >= " + timeLimit._2 + " AND recv_time < " + timeLimit._1
2020-11-10 16:59:39 +08:00
val sql =
2021-08-02 18:49:43 +08:00
s"""
|(SELECT
| FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME,vsys_id AS VSYS_ID
|FROM
| ((SELECT
| ssl_sni AS FQDN,MAX( recv_time ) AS LAST_FOUND_TIME,MIN( recv_time ) AS FIRST_FOUND_TIME,vsys_id AS VSYS_ID
| FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
| WHERE $where and decoded_as = 'SSL' GROUP BY ssl_sni,vsys_id
| )UNION ALL
| (SELECT
| http_host AS FQDN,MAX( recv_time ) AS LAST_FOUND_TIME,MIN( recv_time ) AS FIRST_FOUND_TIME,vsys_id AS VSYS_ID
| FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
| WHERE $where and decoded_as = 'HTTP' GROUP BY http_host,vsys_id))
2022-09-19 10:05:51 +08:00
|GROUP BY FQDN,VSYS_ID HAVING FQDN != '') as dbtable
2020-11-10 16:59:39 +08:00
""".stripMargin
LOG.warn(sql)
val frame = initClickhouseData(sql)
frame.printSchema()
frame
}
2021-03-23 11:26:55 +08:00
def getVertexIpDf: DataFrame = {
val where = "recv_time >= " + timeLimit._2 + " AND recv_time < " + timeLimit._1
2020-11-10 16:59:39 +08:00
val sql =
s"""
|(SELECT * FROM
|((SELECT client_ip AS IP,MIN(recv_time) AS FIRST_FOUND_TIME,
|MAX(recv_time) AS LAST_FOUND_TIME,
2020-11-10 16:59:39 +08:00
|count(*) as SESSION_COUNT,
|SUM(sent_bytes+received_bytes) as BYTES_SUM,
|'' as common_link_info,
2020-11-10 16:59:39 +08:00
|'client' as ip_type
|,vsys_id AS VSYS_ID
2021-09-16 11:08:01 +08:00
|FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
2020-11-10 16:59:39 +08:00
|where $where
|group by client_ip,vsys_id)
2020-11-10 16:59:39 +08:00
|UNION ALL
|(SELECT server_ip AS IP,
|MIN(recv_time) AS FIRST_FOUND_TIME,
|MAX(recv_time) AS LAST_FOUND_TIME,
2020-11-10 16:59:39 +08:00
|count(*) as SESSION_COUNT,
|SUM(sent_bytes+received_bytes) as BYTES_SUM,
|'' as common_link_info,
2020-11-10 16:59:39 +08:00
|'server' as ip_type
|,vsys_id AS VSYS_ID
2021-09-16 11:08:01 +08:00
|FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
2020-11-10 16:59:39 +08:00
|where $where
|group by server_ip,vsys_id))) as dbtable
2020-11-10 16:59:39 +08:00
""".stripMargin
LOG.warn(sql)
val frame = initClickhouseData(sql)
frame.printSchema()
frame
}
2021-03-23 11:26:55 +08:00
def getRelationFqdnLocateIpDf: DataFrame = {
val where = "recv_time >= " + timeLimit._2 + " AND recv_time < " + timeLimit._1
2020-11-10 16:59:39 +08:00
val sql =
s"""
|(SELECT * FROM
|((SELECT ssl_sni AS FQDN,server_ip,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(client_ip)) AS DIST_CIP_RECENT,'TLS' AS decoded_as_list, vsys_id AS VSYS_ID
2021-09-16 11:08:01 +08:00
|FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
|WHERE $where and decoded_as = 'SSL' GROUP BY ssl_sni,server_ip,vsys_id)
2020-11-10 16:59:39 +08:00
|UNION ALL
|(SELECT http_host AS FQDN,server_ip,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(client_ip)) AS DIST_CIP_RECENT,'HTTP' AS decoded_as_list,vsys_id AS VSYS_ID
2021-09-16 11:08:01 +08:00
|FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
|WHERE $where and decoded_as = 'HTTP' GROUP BY http_host,server_ip,vsys_id))
2020-11-10 16:59:39 +08:00
|WHERE FQDN != '') as dbtable
""".stripMargin
LOG.warn(sql)
val frame = initClickhouseData(sql)
frame.printSchema()
frame
}
2021-03-23 11:26:55 +08:00
def getRelationSubidLocateIpDf: DataFrame = {
2020-11-10 16:59:39 +08:00
val where =
s"""
| common_recv_time >= ${getRadiusTimeRange._2}
| AND common_recv_time < ${getRadiusTimeRange._1}
2020-11-10 16:59:39 +08:00
| AND common_subscriber_id != ''
| AND radius_framed_ip != ''
| AND radius_packet_type = 4
| AND radius_acct_status_type = 1
2020-11-10 16:59:39 +08:00
""".stripMargin
val sql =
s"""
|(
2022-09-19 10:05:51 +08:00
|SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,common_vsys_id AS VSYS_ID
2021-09-16 11:08:01 +08:00
|FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE}
2022-09-19 10:05:51 +08:00
|WHERE $where GROUP BY common_subscriber_id,radius_framed_ip,common_vsys_id
2020-11-10 16:59:39 +08:00
|) as dbtable
""".stripMargin
LOG.warn(sql)
val frame = initClickhouseData(sql)
frame.printSchema()
frame
}
2021-03-23 11:26:55 +08:00
def getVertexSubidDf: DataFrame = {
2020-11-10 16:59:39 +08:00
val where =
s"""
| common_recv_time >= ${getRadiusTimeRange._2}
| AND common_recv_time < ${getRadiusTimeRange._1}
2020-11-10 16:59:39 +08:00
| AND common_subscriber_id != ''
| AND radius_framed_ip != ''
| AND radius_packet_type = 4
| AND radius_acct_status_type = 1
2020-11-10 16:59:39 +08:00
""".stripMargin
2022-09-20 11:08:54 +08:00
2020-11-10 16:59:39 +08:00
val sql =
s"""
|(
2022-09-19 10:05:51 +08:00
|SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,common_vsys_id AS VSYS_ID FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE}
|WHERE $where GROUP BY common_subscriber_id,common_vsys_id
2020-11-10 16:59:39 +08:00
|)as dbtable
""".stripMargin
LOG.warn(sql)
val frame = initClickhouseData(sql)
frame.printSchema()
frame
}
2021-03-23 11:26:55 +08:00
def getVertexFramedIpDf: DataFrame = {
2020-11-10 16:59:39 +08:00
val where =
s"""
| common_recv_time >= ${getRadiusTimeRange._2}
| AND common_recv_time < ${getRadiusTimeRange._1}
2020-11-10 16:59:39 +08:00
| AND common_subscriber_id != ''
| AND radius_framed_ip != ''
| AND radius_packet_type = 4
| AND radius_acct_status_type = 1
2020-11-10 16:59:39 +08:00
""".stripMargin
val sql =
s"""
|(
2022-09-19 10:05:51 +08:00
|SELECT radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,common_vsys_id AS VSYS_ID FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} WHERE $where
|GROUP BY radius_framed_ip,common_vsys_id
2020-11-10 16:59:39 +08:00
|)as dbtable
""".stripMargin
LOG.warn(sql)
val frame = initClickhouseData(sql)
frame.printSchema()
frame
}
private def getRadiusTimeRange: (Long, Long) = {
val date = DateUtils.getTimeFloor(new Date(System.currentTimeMillis()), "PT1M")
val max = date.getTime / 1000
val min = DateUtils.getSomeMinute(date, ApplicationConfig.READ_RADIUS_GRANULARITY).getTime / 1000
(max, min)
}
def main(args: Array[String]): Unit = {
println(getRadiusTimeRange)
println(getRadiusTimeRange._2 - getRadiusTimeRange._1)
}
2021-03-23 11:26:55 +08:00
private def getTimeLimit: (Long, Long) = {
var maxTime = 0L
var minTime = 0L
ApplicationConfig.CLICKHOUSE_TIME_LIMIT_TYPE match {
case 0 =>
maxTime = currentHour
minTime = maxTime - ApplicationConfig.UPDATE_INTERVAL
case 1 =>
maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME
minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME
case _ =>
}
(maxTime, minTime)
}
2020-08-06 16:11:16 +08:00
}