Docs Home → MongoDB Spark Connector
Aggregation
This version of the documentation is archived and no longer supported. See the current documentation for the latest version of the MongoDB Connector for Spark.
Pass an aggregation pipeline to
a JavaMongoRDD
instance to filter data and perform aggregations in
MongoDB before passing documents to Spark.
The following example uses an aggregation pipeline to perform the same
filter operation as the example above; filter all documents where the
test
field has a value greater than 5:
package com.mongodb.spark_examples; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.bson.Document; import com.mongodb.spark.MongoSpark; import com.mongodb.spark.rdd.api.java.JavaMongoRDD; import static java.util.Collections.singletonList; public final class Aggregation { public static void main(final String[] args) throws InterruptedException { SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Create a JavaSparkContext using the SparkSession's SparkContext object JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); // Load and analyze data from MongoDB JavaMongoRDD<Document> rdd = MongoSpark.load(jsc); /*Start Example: Use aggregation to filter a RDD***************/ JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline( singletonList( Document.parse("{ $match: { test : { $gt : 5 } } }"))); /*End Example**************************************************/ // Analyze data from MongoDB System.out.println(aggregatedRdd.count()); System.out.println(aggregatedRdd.first().toJson()); jsc.close(); } }