From 51d254990205f422a7bb57be136fbfa162169ee5 Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Wed, 14 Apr 2021 14:20:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dclass=20tag=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E7=BC=96=E8=AF=91bug=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application.properties | 11 ++++++----- .../scala/cn/ac/iie/config/ApplicationConfig.scala | 1 - .../src/main/scala/cn/ac/iie/dao/BaseArangoData.scala | 4 +++- .../cn/ac/iie/service/update/UpdateDocument.scala | 8 ++++---- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties index 7df5c61..3d28d12 100644 --- a/ip-learning-spark/src/main/resources/application.properties +++ b/ip-learning-spark/src/main/resources/application.properties @@ -15,22 +15,23 @@ spark.read.clickhouse.fetchsize=10000 spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME clickhouse.socket.timeout=300000 #arangoDB配置 -arangoDB.host=192.168.40.223 +#arangoDB.host=192.168.40.223 +arangoDB.host=192.168.44.12 arangoDB.port=8529 arangoDB.user=root -arangoDB.password=galaxy_2019 -arangoDB.DB.name=tsg_galaxy_v3 +#arangoDB.password=galaxy_2019 +arangoDB.password=ceiec2019 +arangoDB.DB.name=tsg_galaxy_v3_test #arangoDB.DB.name=iplearn_media_domain arangoDB.ttl=3600 thread.pool.number=10 #读取clickhouse时间范围方式,0:读取过去一小时;1:指定时间范围 -clickhouse.time.limit.type=1 +clickhouse.time.limit.type=0 read.clickhouse.max.time=1608518990 read.clickhouse.min.time=1604851201 -arangoDB.read.limit=1 update.arango.batch=10000 distinct.client.ip.num=10000 diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala index 687bdd5..6537496 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala @@ -36,7 +36,6 @@ object ApplicationConfig { val READ_CLICKHOUSE_MAX_TIME: Long = config.getLong("read.clickhouse.max.time") val READ_CLICKHOUSE_MIN_TIME: Long = config.getLong("read.clickhouse.min.time") - val ARANGODB_READ_LIMIT: Int = config.getInt("arangoDB.read.limit") val UPDATE_ARANGO_BATCH: Int = config.getInt("update.arango.batch") val RECENT_COUNT_HOUR: Int = config.getInt("recent.count.hour") val DISTINCT_CLIENT_IP_NUM: Int = config.getInt("distinct.client.ip.num") diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseArangoData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseArangoData.scala index cafcca8..8a38062 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseArangoData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseArangoData.scala @@ -6,11 +6,13 @@ import cn.ac.iie.spark.rdd.{ArangoRdd, ReadOptions} import cn.ac.iie.utils.SparkSessionUtil.sparkContext import org.slf4j.LoggerFactory +import scala.reflect.ClassTag + object BaseArangoData { private val LOG = LoggerFactory.getLogger(BaseArangoData.getClass) private val options = ReadOptions(ApplicationConfig.ARANGODB_DB_NAME) - def loadArangoRdd[T](name:String): ArangoRdd[T] ={ + def loadArangoRdd[T: ClassTag](name:String): ArangoRdd[T] ={ val value = ArangoSpark.load[T](sparkContext, name, options) LOG.warn(s"读取$name arangoDb:${value.count()}") diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index 59f30a4..dba2b98 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -19,13 +19,13 @@ object UpdateDocument { def update(): Unit = { try { -// updateDocument("FQDN", getVertexFqdnRow, mergeVertexFqdn) + updateDocument("FQDN", getVertexFqdnRow, mergeVertexFqdn) -// updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid) + updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid) -// insertFrameIp() + insertFrameIp() -// updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp) + updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp) updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, mergeRelationFqdnLocateIp)