diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties index 3d28d12..3365c33 100644 --- a/ip-learning-spark/src/main/resources/application.properties +++ b/ip-learning-spark/src/main/resources/application.properties @@ -13,6 +13,8 @@ spark.read.clickhouse.password=ceiec2019 spark.read.clickhouse.numPartitions=5 spark.read.clickhouse.fetchsize=10000 spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME +spark.read.clickhouse.session.table=session_record +spark.read.clickhouse.radius.table=radius_record clickhouse.socket.timeout=300000 #arangoDB配置 #arangoDB.host=192.168.40.223 @@ -38,3 +40,5 @@ distinct.client.ip.num=10000 recent.count.hour=24 update.interval=3600 + +arangodb.total.num=20000000 diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala index 6537496..983921f 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala @@ -20,6 +20,8 @@ object ApplicationConfig { val SPARK_READ_CLICKHOUSE_PASSWORD: String = config.getString("spark.read.clickhouse.password") val SPARK_READ_CLICKHOUSE_FETCHSIZE: String = config.getString("spark.read.clickhouse.fetchsize") val SPARK_READ_CLICKHOUSE_PARTITIONCOLUMN: String = config.getString("spark.read.clickhouse.partitionColumn") + val SPARK_READ_CLICKHOUSE_SESSION_TABLE: String = config.getString("spark.read.clickhouse.session.table") + val SPARK_READ_CLICKHOUSE_RADIUS_TABLE: String = config.getString("spark.read.clickhouse.radius.table") val ARANGODB_HOST: String= config.getString("arangoDB.host") val ARANGODB_PORT: Int = config.getInt("arangoDB.port") @@ -42,4 +44,6 @@ object ApplicationConfig { val UPDATE_INTERVAL: Int = config.getInt("update.interval") + val ARANGODB_TOTAL_NUM: Long = config.getLong("arangodb.total.num") + } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index c33ee14..1a3bad3 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -41,12 +41,12 @@ object BaseClickhouseData { |FROM | ((SELECT | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME - | FROM connection_record_log + | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} | WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni | )UNION ALL | (SELECT | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME - | FROM connection_record_log + | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} | WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host)) |GROUP BY FQDN HAVING FQDN != '') as dbtable """.stripMargin @@ -67,7 +67,7 @@ object BaseClickhouseData { |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM, |groupUniqArray(2)(common_link_info_c2s)[2] as common_link_info, |'client' as ip_type - |FROM connection_record_log + |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |where $where |group by common_client_ip) |UNION ALL @@ -78,7 +78,7 @@ object BaseClickhouseData { |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM, |groupUniqArray(2)(common_link_info_s2c)[2] as common_link_info, |'server' as ip_type - |FROM connection_record_log + |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |where $where |group by common_server_ip))) as dbtable """.stripMargin @@ -96,12 +96,12 @@ object BaseClickhouseData { |(SELECT * FROM |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type - |FROM connection_record_log + |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip) |UNION ALL |(SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type - |FROM connection_record_log + |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip)) |WHERE FQDN != '') as dbtable """.stripMargin @@ -123,7 +123,7 @@ object BaseClickhouseData { s""" |( |SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME - |FROM radius_record_log + |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} |WHERE $where GROUP BY common_subscriber_id,radius_framed_ip |) as dbtable """.stripMargin @@ -144,7 +144,7 @@ object BaseClickhouseData { val sql = s""" |( - |SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log + |SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} |WHERE $where GROUP BY common_subscriber_id |)as dbtable """.stripMargin @@ -165,7 +165,7 @@ object BaseClickhouseData { val sql = s""" |( - |SELECT radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME FROM radius_record_log WHERE $where + |SELECT radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_RADIUS_TABLE} WHERE $where |GROUP BY radius_framed_ip |)as dbtable """.stripMargin diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index febfe4d..487e9c0 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -19,13 +19,13 @@ object UpdateDocument { def update(): Unit = { try { - updateDocument("FQDN", getVertexFqdnRow, mergeVertexFqdn) +// updateDocument("FQDN", getVertexFqdnRow, mergeVertexFqdn) -// updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid) + updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid) -// insertFrameIp() + insertFrameIp() -// updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp) + updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp) updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, mergeRelationFqdnLocateIp) diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala index 7132c19..b498df8 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala @@ -77,6 +77,9 @@ class ArangoRdd[T: ClassTag](@transient override val sparkContext: SparkContext, try { val longs = arangoDB.db(options.database).query(sql, classOf[Long]) while (longs.hasNext) cnt = longs.next + if (cnt > ApplicationConfig.ARANGODB_TOTAL_NUM){ + cnt = ApplicationConfig.ARANGODB_TOTAL_NUM + } } catch { case e: Exception => LOG.error(sql + s"执行异常:${e.getMessage}") }finally {