Docs Home → MongoDB Spark Connector
Datasets and SQL
Note
Source Code
For the source code that contains the examples below, see SparkSQL.scala.
Getting Started
This tutorial works either as a self-contained Scala application or as individual commands in the Spark Shell.
Insert the following documents to the characters
collection:
package com.mongodb object SparkSQL { def main(args: Array[String]): Unit = { import org.apache.spark.sql.SparkSession /* For Self-Contained Scala Apps: Create the SparkSession * CREATED AUTOMATICALLY IN spark-shell */ val sparkSession = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.characters") .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.characters") .getOrCreate() import com.mongodb.spark._ import com.mongodb.spark.config._ import org.bson.Document val docs = """ {"name": "Bilbo Baggins", "age": 50} {"name": "Gandalf", "age": 1000} {"name": "Thorin", "age": 195} {"name": "Balin", "age": 178} {"name": "Kíli", "age": 77} {"name": "Dwalin", "age": 169} {"name": "Óin", "age": 167} {"name": "Glóin", "age": 158} {"name": "Fíli", "age": 82} {"name": "Bombur"}""".trim.stripMargin.split("[\\r\\n]+").toSeq sparkSession.sparkContext.parallelize(docs.map(Document.parse)).saveToMongoDB() // Additional operations go here... } }
DataFrames and Datasets
New in Spark 2.0, a DataFrame
is represented by a Dataset
of
Rows
and is now an alias of Dataset[Row]
.
The Mongo Spark Connector provides the
com.mongodb.spark.sql.DefaultSource
class that creates
DataFrames
and Datasets
from MongoDB. Use the connector's
MongoSpark
helper to facilitate the creation of a DataFrame
:
val df = MongoSpark.load(sparkSession) // Uses the SparkSession df.printSchema() // Prints DataFrame schema
The operation prints the following:
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
Note
By default, reading from MongoDB in a SparkSession
infers the
schema by sampling documents from the database. To explicitly
declare a schema, see Explicitly Declare a Schema.
Alternatively, you can use SparkSession
methods to create DataFrames:
val df2 = sparkSession.loadFromMongoDB() // SparkSession used for configuration val df3 = sparkSession.loadFromMongoDB(ReadConfig( Map("uri" -> "mongodb://example.com/database.collection") ) ) // ReadConfig used for configuration val df4 = sparkSession.read.mongo() // SparkSession used for configuration sqlContext.read.format("mongo").load() // Set custom options import com.mongodb.spark.config._ val customReadConfig = ReadConfig(Map("readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc))) val df5 = sparkSession.read.mongo(customReadConfig) val df6 = sparkSession.read.format("mongo").options(customReadConfig.asOptions).load()
Filters
Note
When using filters
with DataFrames or Spark SQL, the underlying
Mongo Connector code constructs an aggregation pipeline to filter the data in MongoDB before
sending it to Spark.
The following example filters and output the characters with ages under 100:
df.filter(df("age") < 100).show()
The operation outputs the following:
+--------------------+---+-------------+ | _id|age| name| +--------------------+---+-------------+ |[5755d7b4566878c9...| 50|Bilbo Baggins| |[5755d7b4566878c9...| 82| Fíli| |[5755d7b4566878c9...| 77| Kíli| +--------------------+---+-------------+
Explicitly Declare a Schema
By default, reading from MongoDB in a SparkSession
infers the
schema by sampling documents from the collection. You can also use a
case class
to define the schema explicitly, thus removing the extra
queries needed for sampling.
Note
If you provide a case class for the schema, MongoDB returns only the declared fields. This helps minimize the data sent across the wire.
The following statement creates a Character
case class
and then
uses it to define the schema for the DataFrame
:
case class Character(name: String, age: Int)
Important
For self-contained Scala applications, the Character
class
should be defined outside of the method using the class.
val explicitDF = MongoSpark.load[Character](sparkSession) explicitDF.printSchema()
The operation prints the following output:
root |-- name: string (nullable = true) |-- age: integer (nullable = false)
Convert to DataSet
You can use the case class when converting the DataFrame
to a
Dataset
as in the following example:
val dataset = explicitDF.as[Character]
Convert RDD to DataFrame and Dataset
The MongoRDD
class provides helpers to convert an RDD
to
DataFrames
and Datasets
. The following example passes a
SparkContext
object to the MongoSpark.load()
which returns an
RDD
, then converts it:
// Passing the SparkContext to load returns a RDD, not DF or DS val rdd = MongoSpark.load(sparkSession.sparkContext) val dfInferredSchema = rdd.toDF() val dfExplicitSchema = rdd.toDF[Character]() val ds = rdd.toDS[Character]()
SQL Queries
Before running SQL queries on your dataset, you must register a temporary view for the dataset.
The following operation registers a
characters
table and then queries it to find all characters that
are 100 or older:
val characters = MongoSpark.load[Character](sparkSession) characters.createOrReplaceTempView("characters") val centenarians = sparkSession.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
Save DataFrames to MongoDB
The MongoDB Spark Connector provides the ability to persist DataFrames to a collection in MongoDB.
The following example uses MongoSpark.save(DataFrameWriter)
method
to save the centenarians
into the hundredClub
collection in
MongoDB and to verify the save, reads from the hundredClub
collection:
MongoSpark.save(centenarians.write.option("collection", "hundredClub").mode("overwrite")) println("Reading from the 'hundredClub' collection:") MongoSpark.load[Character](sparkSession, ReadConfig(Map("collection" -> "hundredClub"), Some(ReadConfig(sparkSession)))).show()
The DataFrameWriter includes the .mode("overwrite")
to drop the
hundredClub
collection before writing the results, if the
collection already exists.
In the Spark Shell, the operation prints the following output:
+-------+----+ | name| age| +-------+----+ |Gandalf|1000| | Thorin| 195| | Balin| 178| | Dwalin| 169| | Óin| 167| | Glóin| 158| +-------+----+
MongoSpark.save(dataFrameWriter)
is shorthand for configuring and
saving via the DataFrameWriter. The following examples write DataFrames
to MongoDB using the DataFrameWriter directly:
centenarians.write.option("collection", "hundredClub").mode("overwrite").mongo() centenarians.write.option("collection", "hundredClub").mode("overwrite").format("mongo").save()
DataTypes
Spark supports a limited number of data types to ensure that all BSON types can be round tripped in and out of Spark DataFrames/Datasets. The Spark Connector creates custom StructTypes for any unsupported BSON types.
The following table shows the mapping between the BSON Types and Spark Types:
BSON Type | Spark Type |
---|---|
Document | StructType |
Array | ArrayType |
32-bit integer | Integer |
64-bit integer | Long |
Binary data | Array[Byte] or StructType : { subType: Byte, data: Array[Byte]} |
Boolean | Boolean |
Date | java.sql.Timestamp |
DBPointer | StructType : { ref: String , oid: String} |
Double | Double |
JavaScript | StructType : { code: String } |
JavaScript with scope | StructType : { code: String , scope: String } |
Max key | StructType : { maxKey: Integer } |
Min key | StructType : { minKey: Integer } |
Null | null |
ObjectId | StructType : { oid: String } |
Regular Expression | StructType : { regex: String , options: String } |
String | String |
Symbol | StructType : { symbol: String } |
Timestamp | StructType : { time: Integer , inc: Integer } |
Undefined | StructType : { undefined: Boolean } |
Dataset support
To help better support Datasets, the following Scala case classes (
com.mongodb.spark.sql.fieldTypes
) and JavaBean classes (
com.mongodb.spark.sql.fieldTypes.api.java.
) have been created to
represent the unsupported BSON Types:
BSON Type | Scala case class | JavaBean |
---|---|---|
Binary data | Binary | Binary |
DBPointer | DBPointer | DBPointer |
JavaScript | JavaScript | JavaScript |
JavaScript with scope | JavaScriptWithScope | JavaScriptWithScope |
Max key | MaxKey | MaxKey |
Min key | MinKey | MinKey |
ObjectId | ObjectId | ObjectId |
Regular Expression | RegularExpression | RegularExpression |
Symbol | Symbol | Symbol |
Timestamp | Timestamp | Timestamp |
Undefined | Undefined | Undefined |
For convenience, all BSON Types can be represented as a String value as well. However, these values lose all their original type information and, if saved back to MongoDB, are stored as a Strings.