Docs Home → MongoDB Spark Connector
Datasets and SQL
Datasets
The Dataset API provides the type safety and functional programming
benefits of RDDs along with the relational model and performance
optimizations of the DataFrame API. DataFrame
no longer exists as a
class in the Java API, so Dataset<Row>
must be used to reference a
DataFrame going forward.
The following app demonstrates how to create a Dataset
with an
implicit schema, create a Dataset
with an explicit schema, and run
SQL queries on the dataset.
Consider a collection named characters
:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
package com.mongodb.spark_examples; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import com.mongodb.spark.MongoSpark; public final class DatasetSQLDemo { public static void main(final String[] args) throws InterruptedException { SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Create a JavaSparkContext using the SparkSession's SparkContext object JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); // Load data and infer schema, disregard toDF() name as it returns Dataset Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF(); implicitDS.printSchema(); implicitDS.show(); // Load data with explicit schema Dataset<Character> explicitDS = MongoSpark.load(jsc).toDS(Character.class); explicitDS.printSchema(); explicitDS.show(); // Create the temp view and execute the query explicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show(); // Write the data to the "hundredClub" collection MongoSpark.write(centenarians).option("collection", "hundredClub").mode("overwrite").save(); // Load the data from the "hundredClub" collection MongoSpark.load(sparkSession, ReadConfig.create(sparkSession).withOption("collection", "hundredClub"), Character.class).show(); jsc.close(); } }
Implicitly Declare a Schema
To create a Dataset from MongoDB data, load the data via
MongoSpark
and call the JavaMongoRDD.toDF()
method. Despite
toDF()
sounding like a DataFrame
method, it is part of the
Dataset API and returns a Dataset<Row>
.
The dataset's schema is inferred whenever data is read from MongoDB and
stored in a Dataset<Row>
without specifying a schema-defining
Java bean. The schema is inferred by sampling documents from
the database. To explicitly declare a schema, see
Explicitly Declare a Schema.
The following operation loads data from MongoDB then uses the Dataset API to create a Dataset and infer the schema:
Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF(); implicitDS.printSchema(); implicitDS.show();
implicitDS.printSchema()
outputs the following schema to the console:
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
implicitDS.show()
outputs the following to the console:
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
Explicitly Declare a Schema
By default, reading from MongoDB in a SparkSession
infers the
schema by sampling documents from the collection. You can also use a
Java bean
to define the schema explicitly, thus removing the extra
queries needed for sampling.
Note
If you provide a case class for the schema, MongoDB returns only the declared fields. This helps minimize the data sent across the wire.
The following statement creates a Character
Java bean
and then
uses it to define the schema for the DataFrame
:
import java.io.Serializable; public final class Character implements Serializable { private String name; private Integer age; public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(final Integer age) { this.age = age; } }
The bean is passed to the toDS( Class<T> beanClass )
method to
define the schema for the Dataset:
Dataset<Character> explicitDS = MongoSpark.load(jsc).toDS(Character.class); explicitDS.printSchema(); explicitDS.show();
explicitDS.printSchema()
outputs the following:
root |-- age: integer (nullable = true) |-- name: string (nullable = true)
explicitDS.show()
outputs the following:
+----+-------------+ | age| name| +----+-------------+ | 50|Bilbo Baggins| |1000| Gandalf| | 195| Thorin| | 178| Balin| | 77| Kíli| | 169| Dwalin| | 167| Óin| | 158| Glóin| | 82| Fíli| |null| Bombur| +----+-------------+
SQL
Before running SQL queries on your dataset, you must register a temporary view for the dataset.
The following operation registers a
characters
table and then queries it to find all characters that
are 100 or older:
explicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show()
outputs the following:
+-------+----+ | name| age| +-------+----+ |Gandalf|1000| | Thorin| 195| | Balin| 178| | Dwalin| 169| | Óin| 167| | Glóin| 158| +-------+----+
Save DataFrames to MongoDB
The MongoDB Spark Connector provides the ability to persist DataFrames to a collection in MongoDB.
The following operation saves centenarians
into the hundredClub
collection in MongoDB:
/* Note: "overwrite" drops the collection before writing, * use "append" to add to the collection */ MongoSpark.write(centenarians).option("collection", "hundredClub") .mode("overwrite").save();