Docs Home → MongoDB Spark Connector
Write to MongoDB
This version of the documentation is archived and no longer supported. See the current documentation for the latest version of the MongoDB Connector for Spark.
When saving RDD data into MongoDB, the data must be convertible to
a BSON document. You may need to include a
map
transformation to convert the data into a Document
(or
BsonDocument
or a DBObject
).
The following example creates a 10 document RDD and saves it to the
MongoDB collection specified in the SparkConf
:
package com.mongodb.spark_examples; import com.mongodb.spark.MongoSpark; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.SparkSession; import org.bson.Document; import static java.util.Arrays.asList; public final class WriteToMongoDB { public static void main(final String[] args) throws InterruptedException { SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); // Create a RDD of 10 documents JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map (new Function<Integer, Document>() { public Document call(final Integer i) throws Exception { return Document.parse("{test: " + i + "}"); } }); /*Start Example: Save data from RDD to MongoDB*****************/ MongoSpark.save(sparkDocuments, writeConfig); /*End Example**************************************************/ jsc.close(); } }
Using a WriteConfig
MongoSpark.save()
can accept a WriteConfig
object which
specifies various write configuration settings, such as the collection or the write concern.
For example, the following code saves data to the spark
collection
with a majority
write concern:
package com.mongodb.spark_examples; import com.mongodb.spark.MongoSpark; import com.mongodb.spark.config.WriteConfig; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.SparkSession; import org.bson.Document; import static java.util.Arrays.asList; import java.util.HashMap; import java.util.Map; public final class WriteToMongoDBWriteConfig { public static void main(final String[] args) throws InterruptedException { SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); // Create a custom WriteConfig Map<String, String> writeOverrides = new HashMap<String, String>(); writeOverrides.put("collection", "spark"); writeOverrides.put("writeConcern.w", "majority"); WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides); // Create a RDD of 10 documents JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map (new Function<Integer, Document>() { public Document call(final Integer i) throws Exception { return Document.parse("{spark: " + i + "}"); } }); /*Start Example: Save data from RDD to MongoDB*****************/ MongoSpark.save(sparkDocuments, writeConfig); /*End Example**************************************************/ jsc.close(); } }