package cn.ac.iie.dao import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.utils.SparkSessionUtil.spark import com.zdjizhi.utils.DateUtils import org.apache.spark.sql.DataFrame import org.slf4j.LoggerFactory import java.util.Date object BaseClickhouseData { private val LOG = LoggerFactory.getLogger(BaseClickhouseData.getClass) val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60 private val timeLimit: (Long, Long) = getTimeLimit private def initClickhouseData(sql: String): DataFrame = { val dataFrame: DataFrame = spark.read.format("jdbc") .option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL) .option("dbtable", sql) .option("driver", ApplicationConfig.SPARK_READ_CLICKHOUSE_DRIVER) .option("user", ApplicationConfig.SPARK_READ_CLICKHOUSE_USER) .option("password", ApplicationConfig.SPARK_READ_CLICKHOUSE_PASSWORD) .option("numPartitions", ApplicationConfig.NUMPARTITIONS) .option("partitionColumn", ApplicationConfig.SPARK_READ_CLICKHOUSE_PARTITIONCOLUMN) .option("lowerBound", timeLimit._2) .option("upperBound", timeLimit._1) .option("fetchsize", ApplicationConfig.SPARK_READ_CLICKHOUSE_FETCHSIZE) .option("socket_timeout", ApplicationConfig.CLICKHOUSE_SOCKET_TIMEOUT) .load() dataFrame.printSchema() dataFrame.createOrReplaceGlobalTempView("dbtable") dataFrame } def getVertexFqdnDf: DataFrame = { val where = "recv_time >= " + timeLimit._2 + " AND recv_time < " + timeLimit._1 val sql = s""" |(SELECT | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME,vsys_id AS VSYS_ID |FROM | ((SELECT | ssl_sni AS FQDN,MAX( recv_time ) AS LAST_FOUND_TIME,MIN( recv_time ) AS FIRST_FOUND_TIME,vsys_id AS VSYS_ID | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} | WHERE $where and decoded_as = 'SSL' GROUP BY ssl_sni,vsys_id | )UNION ALL | (SELECT | http_host AS FQDN,MAX( recv_time ) AS LAST_FOUND_TIME,MIN( recv_time ) AS FIRST_FOUND_TIME,vsys_id AS VSYS_ID | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} | WHERE $where and decoded_as = 'HTTP' GROUP BY http_host,vsys_id)) |GROUP BY FQDN,VSYS_ID HAVING FQDN != '') as dbtable """.stripMargin LOG.warn(sql) val frame = initClickhouseData(sql) frame.printSchema() frame } def getVertexIpDf: DataFrame = { val where = "recv_time >= " + timeLimit._2 + " AND recv_time < " + timeLimit._1 val sql = s""" |(SELECT * FROM |((SELECT client_ip AS IP,MIN(recv_time) AS FIRST_FOUND_TIME, |MAX(recv_time) AS LAST_FOUND_TIME, |count(*) as SESSION_COUNT, |SUM(sent_bytes+received_bytes) as BYTES_SUM, |'' as common_link_info, |'client' as ip_type |,vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |where $where |group by client_ip,vsys_id) |UNION ALL |(SELECT server_ip AS IP, |MIN(recv_time) AS FIRST_FOUND_TIME, |MAX(recv_time) AS LAST_FOUND_TIME, |count(*) as SESSION_COUNT, |SUM(sent_bytes+received_bytes) as BYTES_SUM, |'' as common_link_info, |'server' as ip_type |,vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |where $where |group by server_ip,vsys_id))) as dbtable """.stripMargin LOG.warn(sql) val frame = initClickhouseData(sql) frame.printSchema() frame } def getRelationFqdnLocateIpDf: DataFrame = { val where = "recv_time >= " + timeLimit._2 + " AND recv_time < " + timeLimit._1 val sql = s""" |(SELECT * FROM |((SELECT ssl_sni AS FQDN,server_ip AS destination_ip,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(source_ip)) AS DIST_CIP_RECENT,'TLS' AS decoded_as_list, vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |WHERE $where and decoded_as = 'SSL' GROUP BY ssl_sni,server_ip,vsys_id) |UNION ALL |(SELECT http_host AS FQDN,server_ip AS destination_ip,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(source_ip)) AS DIST_CIP_RECENT,'HTTP' AS decoded_as_list,vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |WHERE $where and decoded_as = 'HTTP' GROUP BY http_host,server_ip,vsys_id)) |WHERE FQDN != '') as dbtable """.stripMargin LOG.warn(sql) val frame = initClickhouseData(sql) frame.printSchema() frame } def getRelationSubidLocateIpDf: DataFrame = { val where = s""" | common_recv_time >= ${getRadiusTimeRange._2} | AND common_recv_time < ${getRadiusTimeRange._1} | AND common_subscriber_id != '' | AND radius_framed_ip != '' | AND radius_packet_type = 4 | AND radius_acct_status_type = 1 """.stripMargin val sql = s""" |( |SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,common_vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} |WHERE $where GROUP BY common_subscriber_id,radius_framed_ip,common_vsys_id |) as dbtable """.stripMargin LOG.warn(sql) val frame = initClickhouseData(sql) frame.printSchema() frame } def getVertexSubidDf: DataFrame = { val where = s""" | common_recv_time >= ${getRadiusTimeRange._2} | AND common_recv_time < ${getRadiusTimeRange._1} | AND common_subscriber_id != '' | AND radius_framed_ip != '' | AND radius_packet_type = 4 | AND radius_acct_status_type = 1 """.stripMargin val sql = s""" |( |SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,common_vsys_id AS VSYS_ID FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} |WHERE $where GROUP BY common_subscriber_id,common_vsys_id |)as dbtable """.stripMargin LOG.warn(sql) val frame = initClickhouseData(sql) frame.printSchema() frame } def getVertexFramedIpDf: DataFrame = { val where = s""" | common_recv_time >= ${getRadiusTimeRange._2} | AND common_recv_time < ${getRadiusTimeRange._1} | AND common_subscriber_id != '' | AND radius_framed_ip != '' | AND radius_packet_type = 4 | AND radius_acct_status_type = 1 """.stripMargin val sql = s""" |( |SELECT radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,common_vsys_id AS VSYS_ID FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} WHERE $where |GROUP BY radius_framed_ip,common_vsys_id |)as dbtable """.stripMargin LOG.warn(sql) val frame = initClickhouseData(sql) frame.printSchema() frame } private def getRadiusTimeRange: (Long, Long) = { val date = DateUtils.getTimeFloor(new Date(System.currentTimeMillis()), "PT1M") val max = date.getTime / 1000 val min = DateUtils.getSomeMinute(date, ApplicationConfig.READ_RADIUS_GRANULARITY).getTime / 1000 (max, min) } def main(args: Array[String]): Unit = { println(getRadiusTimeRange) println(getRadiusTimeRange._2 - getRadiusTimeRange._1) } private def getTimeLimit: (Long, Long) = { var maxTime = 0L var minTime = 0L ApplicationConfig.CLICKHOUSE_TIME_LIMIT_TYPE match { case 0 => maxTime = currentHour minTime = maxTime - ApplicationConfig.UPDATE_INTERVAL case 1 => maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME case _ => } (maxTime, minTime) } }