统一日志术语,修改表名

This commit is contained in:
wanglihui
2021-09-16 11:08:01 +08:00
parent 2f7cceb826
commit d61fbee61a
5 changed files with 24 additions and 13 deletions

View File

@@ -13,6 +13,8 @@ spark.read.clickhouse.password=ceiec2019
spark.read.clickhouse.numPartitions=5 spark.read.clickhouse.numPartitions=5
spark.read.clickhouse.fetchsize=10000 spark.read.clickhouse.fetchsize=10000
spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME
spark.read.clickhouse.session.table=session_record
spark.read.clickhouse.radius.table=radius_record
clickhouse.socket.timeout=300000 clickhouse.socket.timeout=300000
#arangoDB配置 #arangoDB配置
#arangoDB.host=192.168.40.223 #arangoDB.host=192.168.40.223
@@ -38,3 +40,5 @@ distinct.client.ip.num=10000
recent.count.hour=24 recent.count.hour=24
update.interval=3600 update.interval=3600
arangodb.total.num=20000000

View File

@@ -20,6 +20,8 @@ object ApplicationConfig {
val SPARK_READ_CLICKHOUSE_PASSWORD: String = config.getString("spark.read.clickhouse.password") val SPARK_READ_CLICKHOUSE_PASSWORD: String = config.getString("spark.read.clickhouse.password")
val SPARK_READ_CLICKHOUSE_FETCHSIZE: String = config.getString("spark.read.clickhouse.fetchsize") val SPARK_READ_CLICKHOUSE_FETCHSIZE: String = config.getString("spark.read.clickhouse.fetchsize")
val SPARK_READ_CLICKHOUSE_PARTITIONCOLUMN: String = config.getString("spark.read.clickhouse.partitionColumn") val SPARK_READ_CLICKHOUSE_PARTITIONCOLUMN: String = config.getString("spark.read.clickhouse.partitionColumn")
val SPARK_READ_CLICKHOUSE_SESSION_TABLE: String = config.getString("spark.read.clickhouse.session.table")
val SPARK_READ_CLICKHOUSE_RADIUS_TABLE: String = config.getString("spark.read.clickhouse.radius.table")
val ARANGODB_HOST: String= config.getString("arangoDB.host") val ARANGODB_HOST: String= config.getString("arangoDB.host")
val ARANGODB_PORT: Int = config.getInt("arangoDB.port") val ARANGODB_PORT: Int = config.getInt("arangoDB.port")
@@ -42,4 +44,6 @@ object ApplicationConfig {
val UPDATE_INTERVAL: Int = config.getInt("update.interval") val UPDATE_INTERVAL: Int = config.getInt("update.interval")
val ARANGODB_TOTAL_NUM: Long = config.getLong("arangodb.total.num")
} }

View File

@@ -41,12 +41,12 @@ object BaseClickhouseData {
|FROM |FROM
| ((SELECT | ((SELECT
| ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
| FROM connection_record_log | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
| WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni | WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni
| )UNION ALL | )UNION ALL
| (SELECT | (SELECT
| http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
| FROM connection_record_log | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
| WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host)) | WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host))
|GROUP BY FQDN HAVING FQDN != '') as dbtable |GROUP BY FQDN HAVING FQDN != '') as dbtable
""".stripMargin """.stripMargin
@@ -67,7 +67,7 @@ object BaseClickhouseData {
|SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM, |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,
|groupUniqArray(2)(common_link_info_c2s)[2] as common_link_info, |groupUniqArray(2)(common_link_info_c2s)[2] as common_link_info,
|'client' as ip_type |'client' as ip_type
|FROM connection_record_log |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
|where $where |where $where
|group by common_client_ip) |group by common_client_ip)
|UNION ALL |UNION ALL
@@ -78,7 +78,7 @@ object BaseClickhouseData {
|SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM, |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,
|groupUniqArray(2)(common_link_info_s2c)[2] as common_link_info, |groupUniqArray(2)(common_link_info_s2c)[2] as common_link_info,
|'server' as ip_type |'server' as ip_type
|FROM connection_record_log |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
|where $where |where $where
|group by common_server_ip))) as dbtable |group by common_server_ip))) as dbtable
""".stripMargin """.stripMargin
@@ -96,12 +96,12 @@ object BaseClickhouseData {
|(SELECT * FROM |(SELECT * FROM
|((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type
|FROM connection_record_log |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
|WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip) |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip)
|UNION ALL |UNION ALL
|(SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, |(SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type
|FROM connection_record_log |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE}
|WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip)) |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip))
|WHERE FQDN != '') as dbtable |WHERE FQDN != '') as dbtable
""".stripMargin """.stripMargin
@@ -123,7 +123,7 @@ object BaseClickhouseData {
s""" s"""
|( |(
|SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME |SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME
|FROM radius_record_log |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE}
|WHERE $where GROUP BY common_subscriber_id,radius_framed_ip |WHERE $where GROUP BY common_subscriber_id,radius_framed_ip
|) as dbtable |) as dbtable
""".stripMargin """.stripMargin
@@ -144,7 +144,7 @@ object BaseClickhouseData {
val sql = val sql =
s""" s"""
|( |(
|SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log |SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE}
|WHERE $where GROUP BY common_subscriber_id |WHERE $where GROUP BY common_subscriber_id
|)as dbtable |)as dbtable
""".stripMargin """.stripMargin
@@ -165,7 +165,7 @@ object BaseClickhouseData {
val sql = val sql =
s""" s"""
|( |(
|SELECT radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME FROM radius_record_log WHERE $where |SELECT radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} WHERE $where
|GROUP BY radius_framed_ip |GROUP BY radius_framed_ip
|)as dbtable |)as dbtable
""".stripMargin """.stripMargin

View File

@@ -19,13 +19,13 @@ object UpdateDocument {
def update(): Unit = { def update(): Unit = {
try { try {
updateDocument("FQDN", getVertexFqdnRow, mergeVertexFqdn) // updateDocument("FQDN", getVertexFqdnRow, mergeVertexFqdn)
// updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid) updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid)
// insertFrameIp() insertFrameIp()
// updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp) updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp)
updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, mergeRelationFqdnLocateIp) updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, mergeRelationFqdnLocateIp)

View File

@@ -77,6 +77,9 @@ class ArangoRdd[T: ClassTag](@transient override val sparkContext: SparkContext,
try { try {
val longs = arangoDB.db(options.database).query(sql, classOf[Long]) val longs = arangoDB.db(options.database).query(sql, classOf[Long])
while (longs.hasNext) cnt = longs.next while (longs.hasNext) cnt = longs.next
if (cnt > ApplicationConfig.ARANGODB_TOTAL_NUM){
cnt = ApplicationConfig.ARANGODB_TOTAL_NUM
}
} catch { } catch {
case e: Exception => LOG.error(sql + s"执行异常:${e.getMessage}") case e: Exception => LOG.error(sql + s"执行异常:${e.getMessage}")
}finally { }finally {