Docs Menu
Docs Home
/ / /
Java Reactive Streams Driver

Aggregation Framework

On this page

  • Prerequisites
  • Connect to a MongoDB Deployment
  • Perform Aggregation
  • Use Aggregation Expressions
  • Explain an Aggregation

The aggregation pipeline is a framework for data aggregation, modeled on the concept of data processing pipelines.

To learn more about aggregation, see Aggregation Pipeline in the Server manual.

You must set up the following components to run the code examples in this guide:

  • A test.restaurants collection populated with documents from the restaurants.json file in the documentation assets GitHub.

  • The following import statements:

import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Filters;
import org.bson.Document;

Important

This guide uses custom Subscriber implementations, which are described in the Sample Custom Subscriber Implementations guide.

First, connect to a MongoDB deployment, then declare and define MongoDatabase and MongoCollection instances.

The following code connects to a standalone MongoDB deployment running on localhost on port 27017. Then, it defines the database variable to refer to the test database and the collection variable to refer to the restaurants collection:

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("restaurants");

To learn more about connecting to MongoDB deployments, see the Connect to MongoDB tutorial.

To perform aggregation, pass a list of aggregation stages to the MongoCollection.aggregate() method. The driver provides the Aggregates helper class that contains builders for aggregation stages.

In this example, the aggregation pipeline performs the following tasks:

  • Uses a $match stage to filter for documents in which the categories array field contains the element "Bakery". The example uses Aggregates.match() to build the $match stage.

  • Uses a $group stage to group the matching documents by the stars field, accumulating a count of documents for each distinct value of stars. The example uses Aggregates.group() to build the $group stage and Accumulators.sum() to build the accumulator expression. For the accumulator expressions for use within the $group stage, the driver provides Accumulators helper class.

collection.aggregate(
Arrays.asList(
Aggregates.match(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))
)
).subscribe(new PrintDocumentSubscriber());

For $group accumulator expressions, the driver provides the Accumulators helper class. For other aggregation expressions, manually build the expression by using the Document class.

In the following example, the aggregation pipeline uses a $project stage to return only the name field and the calculated field firstCategory whose value is the first element in the categories array. The example uses Aggregates.project() and various Projections class methods to build the $project stage:

collection.aggregate(
Arrays.asList(
Aggregates.project(
Projections.fields(
Projections.excludeId(),
Projections.include("name"),
Projections.computed(
"firstCategory",
new Document("$arrayElemAt", Arrays.asList("$categories", 0))
)
)
)
)
).subscribe(new PrintDocumentSubscriber());

To $explain an aggregation pipeline, call the AggregatePublisher.explain() method:

collection.aggregate(
Arrays.asList(
Aggregates.match(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))))
.explain()
.subscribe(new PrintDocumentSubscriber());

Back

Optimize Queries by Using Indexes