/* * DISCLAIMER * * Copyright 2016 ArangoDB GmbH, Cologne, Germany * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * Copyright holder is ArangoDB GmbH, Cologne, Germany * * author Mark - mark at arangodb.com */ package cn.ac.iie.spark import cn.ac.iie.spark.rdd.{ArangoRdd, ReadOptions, WriteOptions} import cn.ac.iie.spark.vpack.VPackUtils import com.arangodb.model.DocumentCreateOptions import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import scala.collection.JavaConverters.seqAsJavaListConverter import scala.reflect.ClassTag object ArangoSpark { /** * Save data from rdd into ArangoDB * * @param rdd the rdd with the data to save * @param collection the collection to save in */ def save[T](rdd: RDD[T], collection: String): Unit = save(rdd, collection, WriteOptions()) /** * Save data from rdd into ArangoDB * * @param rdd the rdd with the data to save * @param collection the collection to save in * @param options additional write options */ def save[T](rdd: RDD[T], collection: String, options: WriteOptions): Unit = saveRDD(rdd, collection, options, (x: Iterator[T]) => x) /** * Save data from dataset into ArangoDB * * @param dataset the dataset with data to save * @param collection the collection to save in */ def save[T](dataset: Dataset[T], collection: String): Unit = saveRDD(dataset.rdd, collection, WriteOptions(), (x: Iterator[T]) => x) /** * Save data from dataset into ArangoDB * * @param dataset the dataset with data to save * @param collection the collection to save in * @param options additional write options */ def save[T](dataset: Dataset[T], collection: String, options: WriteOptions): Unit = saveRDD(dataset.rdd, collection, options, (x: Iterator[T]) => x) /** * Save data from dataframe into ArangoDB * * @param dataframe the dataframe with data to save * @param collection the collection to save in */ def saveDF(dataframe: DataFrame, collection: String): Unit = saveRDD[Row](dataframe.rdd, collection, WriteOptions(), (x: Iterator[Row]) => x.map { y => VPackUtils.rowToVPack(y) }) /** * Save data from dataframe into ArangoDB * * @param dataframe the dataframe with data to save * @param collection the collection to save in * @param options additional write options */ def saveDF(dataframe: DataFrame, collection: String, options: WriteOptions): Unit = saveRDD[Row](dataframe.rdd, collection, options, (x: Iterator[Row]) => x.map { y => VPackUtils.rowToVPack(y) }) private def saveRDD[T](rdd: RDD[T], collection: String, options: WriteOptions, map: Iterator[T] => Iterator[Any]): Unit = { val writeOptions = createWriteOptions(options, rdd.sparkContext.getConf) rdd.foreachPartition { p => if (p.nonEmpty) { val arangoDB = createArangoBuilder(writeOptions).build() val col = arangoDB.db(writeOptions.database).collection(collection) val docs = map(p).toList.asJava writeOptions.method match { case WriteOptions.INSERT => col.insertDocuments(docs) case WriteOptions.UPDATE => col.updateDocuments(docs) case WriteOptions.REPLACE => col.replaceDocuments(docs) case WriteOptions.OVERWRITE => val documentCreateOptions = new DocumentCreateOptions documentCreateOptions.overwrite(true) documentCreateOptions.silent(true) col.insertDocuments(docs, documentCreateOptions) } arangoDB.shutdown() } } } /** * Load data from ArangoDB into rdd * * @param sparkContext the sparkContext containing the ArangoDB configuration * @param collection the collection to load data from */ def load[T: ClassTag](sparkContext: SparkContext, collection: String): ArangoRdd[T] = load(sparkContext, collection, ReadOptions()) /** * Load data from ArangoDB into rdd * * @param sparkContext the sparkContext containing the ArangoDB configuration * @param collection the collection to load data from * @param options read options */ def load[T: ClassTag](sparkContext: SparkContext, collection: String, options: ReadOptions): ArangoRdd[T] = new ArangoRdd[T](sparkContext, createReadOptions(options, sparkContext.getConf).copy(collection = collection)) }