Docs Menu

Docs HomeMongoDB Spark Connector

Aggregation

Pass an aggregation pipeline to a JavaMongoRDD instance to filter data and perform aggregations in MongoDB before passing documents to Spark.

The following example uses an aggregation pipeline to perform the same filter operation as the example above; filter all documents where the test field has a value greater than 5:

package com.mongodb.spark_examples;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import static java.util.Collections.singletonList;
public final class Aggregation {
public static void main(final String[] args) throws InterruptedException {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Create a JavaSparkContext using the SparkSession's SparkContext object
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Load and analyze data from MongoDB
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
/*Start Example: Use aggregation to filter a RDD***************/
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(
singletonList(
Document.parse("{ $match: { test : { $gt : 5 } } }")));
/*End Example**************************************************/
// Analyze data from MongoDB
System.out.println(aggregatedRdd.count());
System.out.println(aggregatedRdd.first().toJson());
jsc.close();
}
}
←  Read from MongoDBDatasets and SQL →
Share Feedback
© 2023 MongoDB, Inc.

About

  • Careers
  • Investor Relations
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2023 MongoDB, Inc.