资源隔离三个任务,增加spark.executor.cores、spark.cores.max资源控制参数。

This commit is contained in:
wanglihui
2021-10-29 18:54:18 +08:00
parent 264afdaa3e
commit bf707f8b2e
9 changed files with 341 additions and 158 deletions

View File

@@ -1,7 +1,10 @@
package cn.ac.iie.dao
import java.util.Date
import cn.ac.iie.config.ApplicationConfig
import cn.ac.iie.utils.SparkSessionUtil.spark
import com.zdjizhi.utils.DateUtils
import org.apache.spark.sql.DataFrame
import org.slf4j.LoggerFactory
@@ -36,19 +39,19 @@ object BaseClickhouseData {
val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1
val sql =
s"""
|(SELECT
| FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME
|FROM
| ((SELECT
| ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
| FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
| WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni
| )UNION ALL
| (SELECT
| http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
| FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
| WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host))
|GROUP BY FQDN HAVING FQDN != '') as dbtable
|(SELECT
| FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME
|FROM
| ((SELECT
| ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
| FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
| WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni
| )UNION ALL
| (SELECT
| http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
| FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
| WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host))
|GROUP BY FQDN HAVING FQDN != '') as dbtable
""".stripMargin
LOG.warn(sql)
val frame = initClickhouseData(sql)
@@ -114,10 +117,12 @@ object BaseClickhouseData {
def getRelationSubidLocateIpDf: DataFrame = {
val where =
s"""
| common_recv_time >= ${timeLimit._2}
| AND common_recv_time < ${timeLimit._1}
| common_recv_time >= ${getRadiusTimeRange._2}
| AND common_recv_time < ${getRadiusTimeRange._1}
| AND common_subscriber_id != ''
| AND radius_framed_ip != ''
| AND radius_packet_type = 4
| AND radius_acct_status_type = 1
""".stripMargin
val sql =
s"""
@@ -136,8 +141,8 @@ object BaseClickhouseData {
def getVertexSubidDf: DataFrame = {
val where =
s"""
| common_recv_time >= ${timeLimit._2}
| AND common_recv_time < ${timeLimit._1}
| common_recv_time >= ${getRadiusTimeRange._2}
| AND common_recv_time < ${getRadiusTimeRange._1}
| AND common_subscriber_id != ''
| AND radius_framed_ip != ''
| AND radius_packet_type = 4
@@ -159,8 +164,8 @@ object BaseClickhouseData {
def getVertexFramedIpDf: DataFrame = {
val where =
s"""
| common_recv_time >= ${timeLimit._2}
| AND common_recv_time < ${timeLimit._1}
| common_recv_time >= ${getRadiusTimeRange._2}
| AND common_recv_time < ${getRadiusTimeRange._1}
| AND common_subscriber_id != ''
| AND radius_framed_ip != ''
| AND radius_packet_type = 4
@@ -180,6 +185,17 @@ object BaseClickhouseData {
frame
}
private def getRadiusTimeRange: (Long, Long) = {
val date = DateUtils.getTimeFloor(new Date(System.currentTimeMillis()), "PT1M")
val max = date.getTime / 1000
val min = DateUtils.getSomeMinute(date, ApplicationConfig.READ_RADIUS_GRANULARITY).getTime / 1000
(max, min)
}
def main(args: Array[String]): Unit = {
println(getRadiusTimeRange)
println(getRadiusTimeRange._2 - getRadiusTimeRange._1)
}
private def getTimeLimit: (Long, Long) = {
var maxTime = 0L