Docs Home → MongoDB Spark Connector
Aggregation
This version of the documentation is archived and no longer supported. See the current documentation for the latest version of the MongoDB Connector for Spark.
Use MongoDB's aggregation pipeline to apply filtering rules and perform aggregation operations when reading data from MongoDB into Spark.
Consider a collection named fruit
that contains the
following documents:
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
Add the option()
method to spark.read()
from
within the pyspark
shell to specify an aggregation pipeline
to use when creating a DataFrame.
pipeline = "{'$match': {'type': 'apple'}}" df = spark.read.format("mongo").option("pipeline", pipeline).load() df.show()
In the pyspark
shell, the operation prints the following output:
+---+---+-----+ |_id|qty| type| +---+---+-----+ |1.0|5.0|apple| +---+---+-----+