自定义ArangoRDD
This commit is contained in:
@@ -46,7 +46,7 @@ class ArangoRdd[T: ClassTag](@transient override val sparkContext: SparkContext,
|
||||
LOG.info(sql)
|
||||
arangoCursor = arangoDB.db(options.database).query(sql,clazz.runtimeClass.asInstanceOf[Class[T]])
|
||||
}catch {
|
||||
case e: Exception => LOG.error("创建Cursor异常")
|
||||
case e: Exception => LOG.error(s"创建Cursor异常:${e.getMessage}")
|
||||
}finally {
|
||||
arangoDB.shutdown()
|
||||
}
|
||||
@@ -66,13 +66,13 @@ class ArangoRdd[T: ClassTag](@transient override val sparkContext: SparkContext,
|
||||
private def getCountTotal: Long = {
|
||||
val arangoDB = spark.createArangoBuilder(options).build()
|
||||
var cnt = 0L
|
||||
val sql = "RETURN LENGTH(" + options.collection + ")"
|
||||
val sql = s"RETURN LENGTH(${options.collection})"
|
||||
LOG.info(sql)
|
||||
try {
|
||||
val longs = arangoDB.db(options.database).query(sql, classOf[Long])
|
||||
while (longs.hasNext) cnt = longs.next
|
||||
} catch {
|
||||
case e: Exception => LOG.error(sql + "执行异常")
|
||||
case e: Exception => LOG.error(sql + s"执行异常:${e.getMessage}")
|
||||
}finally {
|
||||
arangoDB.shutdown()
|
||||
}
|
||||
|
||||
@@ -27,6 +27,9 @@ object RDDTest {
|
||||
doc.addAttribute("abc", 1)
|
||||
doc
|
||||
})
|
||||
|
||||
value.map(doc => {(doc.getKey,doc)})
|
||||
|
||||
value.persist(StorageLevel.MEMORY_AND_DISK)
|
||||
|
||||
value.foreach(row => println(row.toString))
|
||||
|
||||
Reference in New Issue
Block a user