Docs Home → MongoDB Spark Connector
Datasets and SQL![](/docs/spark-connector/v2.4/assets/link.svg)
Datasets![](/docs/spark-connector/v2.4/assets/link.svg)
The Dataset API provides the type safety and functional programming
benefits of RDDs along with the relational model and performance
optimizations of the DataFrame API. DataFrame
no longer exists as a
class in the Java API, so Dataset<Row>
must be used to reference a
DataFrame going forward.
The following app demonstrates how to create a Dataset
with an
implicit schema, create a Dataset
with an explicit schema, and run
SQL queries on the dataset.
Consider a collection named characters
:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
package com.mongodb.spark_examples; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import com.mongodb.spark.MongoSpark; public final class DatasetSQLDemo { public static void main(final String[] args) throws InterruptedException { SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Create a JavaSparkContext using the SparkSession's SparkContext object JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); // Load data and infer schema, disregard toDF() name as it returns Dataset Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF(); implicitDS.printSchema(); implicitDS.show(); // Load data with explicit schema Dataset<Character> explicitDS = MongoSpark.load(jsc).toDS(Character.class); explicitDS.printSchema(); explicitDS.show(); // Create the temp view and execute the query explicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show(); // Write the data to the "hundredClub" collection MongoSpark.write(centenarians).option("collection", "hundredClub").mode("overwrite").save(); // Load the data from the "hundredClub" collection MongoSpark.load(sparkSession, ReadConfig.create(sparkSession).withOption("collection", "hundredClub"), Character.class).show(); jsc.close(); } }
Implicitly Declare a Schema![](/docs/spark-connector/v2.4/assets/link.svg)
To create a Dataset from MongoDB data, load the data via
MongoSpark
and call the JavaMongoRDD.toDF()
method. Despite
toDF()
sounding like a DataFrame
method, it is part of the
Dataset API and returns a Dataset<Row>
.
The dataset's schema is inferred whenever data is read from MongoDB and
stored in a Dataset<Row>
without specifying a schema-defining
Java bean. The schema is inferred by sampling documents from
the database. To explicitly declare a schema, see
Explicitly Declare a Schema.
The following operation loads data from MongoDB then uses the Dataset API to create a Dataset and infer the schema:
Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF(); implicitDS.printSchema(); implicitDS.show();
implicitDS.printSchema()
outputs the following schema to the console:
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
implicitDS.show()
outputs the following to the console:
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
Explicitly Declare a Schema![](/docs/spark-connector/v2.4/assets/link.svg)
By default, reading from MongoDB in a SparkSession
infers the
schema by sampling documents from the collection. You can also use a
Java bean
to define the schema explicitly, thus removing the extra
queries needed for sampling.
Note
If you provide a case class for the schema, MongoDB returns only the declared fields. This helps minimize the data sent across the wire.
The following statement creates a Character
Java bean
and then
uses it to define the schema for the DataFrame
:
import java.io.Serializable; public final class Character implements Serializable { private String name; private Integer age; public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(final Integer age) { this.age = age; } }
The bean is passed to the toDS( Class<T> beanClass )
method to
define the schema for the Dataset:
Dataset<Character> explicitDS = MongoSpark.load(jsc).toDS(Character.class); explicitDS.printSchema(); explicitDS.show();
explicitDS.printSchema()
outputs the following:
root |-- age: integer (nullable = true) |-- name: string (nullable = true)
explicitDS.show()
outputs the following:
+----+-------------+ | age| name| +----+-------------+ | 50|Bilbo Baggins| |1000| Gandalf| | 195| Thorin| | 178| Balin| | 77| Kíli| | 169| Dwalin| | 167| Óin| | 158| Glóin| | 82| Fíli| |null| Bombur| +----+-------------+
SQL![](/docs/spark-connector/v2.4/assets/link.svg)
Before running SQL queries on your dataset, you must register a temporary view for the dataset.
The following operation registers a
characters
table and then queries it to find all characters that
are 100 or older:
explicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show()
outputs the following:
+-------+----+ | name| age| +-------+----+ |Gandalf|1000| | Thorin| 195| | Balin| 178| | Dwalin| 169| | Óin| 167| | Glóin| 158| +-------+----+
Save DataFrames to MongoDB![](/docs/spark-connector/v2.4/assets/link.svg)
The MongoDB Spark Connector provides the ability to persist DataFrames to a collection in MongoDB.
The following operation saves centenarians
into the hundredClub
collection in MongoDB:
/* Note: "overwrite" drops the collection before writing, * use "append" to add to the collection */ MongoSpark.write(centenarians).option("collection", "hundredClub") .mode("overwrite").save();