自定义ArangoRDD
This commit is contained in:
@@ -0,0 +1,131 @@
|
||||
/*
|
||||
* DISCLAIMER
|
||||
*
|
||||
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
* Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
*
|
||||
* author Mark - mark at arangodb.com
|
||||
*/
|
||||
|
||||
package cn.ac.iie.spark
|
||||
|
||||
import cn.ac.iie.spark.rdd.{ArangoRdd, ReadOptions, WriteOptions}
|
||||
import cn.ac.iie.spark.vpack.VPackUtils
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{DataFrame, Dataset, Row}
|
||||
|
||||
import scala.collection.JavaConverters.seqAsJavaListConverter
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
object ArangoSpark {
|
||||
|
||||
/**
|
||||
* Save data from rdd into ArangoDB
|
||||
*
|
||||
* @param rdd the rdd with the data to save
|
||||
* @param collection the collection to save in
|
||||
*/
|
||||
def save[T](rdd: RDD[T], collection: String): Unit =
|
||||
save(rdd, collection, WriteOptions())
|
||||
|
||||
/**
|
||||
* Save data from rdd into ArangoDB
|
||||
*
|
||||
* @param rdd the rdd with the data to save
|
||||
* @param collection the collection to save in
|
||||
* @param options additional write options
|
||||
*/
|
||||
def save[T](rdd: RDD[T], collection: String, options: WriteOptions): Unit =
|
||||
saveRDD(rdd, collection, options, (x: Iterator[T]) => x)
|
||||
|
||||
/**
|
||||
* Save data from dataset into ArangoDB
|
||||
*
|
||||
* @param dataset the dataset with data to save
|
||||
* @param collection the collection to save in
|
||||
*/
|
||||
def save[T](dataset: Dataset[T], collection: String): Unit =
|
||||
saveRDD(dataset.rdd, collection, WriteOptions(), (x: Iterator[T]) => x)
|
||||
|
||||
/**
|
||||
* Save data from dataset into ArangoDB
|
||||
*
|
||||
* @param dataset the dataset with data to save
|
||||
* @param collection the collection to save in
|
||||
* @param options additional write options
|
||||
*/
|
||||
def save[T](dataset: Dataset[T], collection: String, options: WriteOptions): Unit =
|
||||
saveRDD(dataset.rdd, collection, options, (x: Iterator[T]) => x)
|
||||
|
||||
/**
|
||||
* Save data from dataframe into ArangoDB
|
||||
*
|
||||
* @param dataframe the dataframe with data to save
|
||||
* @param collection the collection to save in
|
||||
* @param options additional write options
|
||||
*/
|
||||
def saveDF(dataframe: DataFrame, collection: String): Unit =
|
||||
saveRDD[Row](dataframe.rdd, collection, WriteOptions(), (x: Iterator[Row]) => x.map { y => VPackUtils.rowToVPack(y) })
|
||||
|
||||
/**
|
||||
* Save data from dataframe into ArangoDB
|
||||
*
|
||||
* @param dataframe the dataframe with data to save
|
||||
* @param collection the collection to save in
|
||||
* @param options additional write options
|
||||
*/
|
||||
def saveDF(dataframe: DataFrame, collection: String, options: WriteOptions): Unit =
|
||||
saveRDD[Row](dataframe.rdd, collection, options, (x: Iterator[Row]) => x.map { y => VPackUtils.rowToVPack(y) })
|
||||
|
||||
private def saveRDD[T](rdd: RDD[T], collection: String, options: WriteOptions, map: Iterator[T] => Iterator[Any]): Unit = {
|
||||
val writeOptions = createWriteOptions(options, rdd.sparkContext.getConf)
|
||||
rdd.foreachPartition { p =>
|
||||
if (p.nonEmpty) {
|
||||
val arangoDB = createArangoBuilder(writeOptions).build()
|
||||
val col = arangoDB.db(writeOptions.database).collection(collection)
|
||||
val docs = map(p).toList.asJava
|
||||
writeOptions.method match {
|
||||
case WriteOptions.INSERT => col.insertDocuments(docs)
|
||||
case WriteOptions.UPDATE => col.updateDocuments(docs)
|
||||
case WriteOptions.REPLACE => col.replaceDocuments(docs)
|
||||
}
|
||||
|
||||
arangoDB.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load data from ArangoDB into rdd
|
||||
*
|
||||
* @param sparkContext the sparkContext containing the ArangoDB configuration
|
||||
* @param collection the collection to load data from
|
||||
*/
|
||||
def load[T: ClassTag](sparkContext: SparkContext, collection: String): ArangoRdd[T] =
|
||||
load(sparkContext, collection, ReadOptions())
|
||||
|
||||
/**
|
||||
* Load data from ArangoDB into rdd
|
||||
*
|
||||
* @param sparkContext the sparkContext containing the ArangoDB configuration
|
||||
* @param collection the collection to load data from
|
||||
* @param additional read options
|
||||
*/
|
||||
def load[T: ClassTag](sparkContext: SparkContext, collection: String, options: ReadOptions): ArangoRdd[T] =
|
||||
new ArangoRdd[T](sparkContext, createReadOptions(options, sparkContext.getConf).copy(collection = collection))
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user