Java Aggregation Expression Builders in MongoDB
Rate this tutorial
MongoDB aggregation pipelines allow developers to create rich document retrieval, manipulation, and update processes expressed as a sequence — or pipeline — of composable stages, where the output of one stage becomes the input to the next stage in the pipeline.
With aggregation operations, it is possible to:
- Group values from multiple documents together.
- Reshape documents.
- Perform aggregation operations on the grouped data to return a single result.
- Apply specialized operations to documents such as geographical functions, full text search, and time-window functions.
- Analyze data changes over time.
The aggregation framework has grown since its introduction in MongoDB version 2.2 to — as of version 6.1 — cover over 35 different stages and over 130 different operators.
Working with the MongoDB shell or in tools such as MongoDB Compass, aggregation pipelines are defined as an array of BSON[1] objects, with each object defining a stage in the pipeline. In an online-store system, a simple pipeline to find all orders placed between January 1st 2023 and March 31st 2023, and then provide a count of those orders grouped by product type, might look like:
1 db.orders.aggregate( 2 [ 3 { 4 $match: 5 { 6 orderDate: { 7 $gte: ISODate("2023-01-01"), 8 }, 9 orderDate: { 10 $lte: ISODate("2023-03-31"), 11 }, 12 }, 13 }, 14 { 15 $group: 16 { 17 _id: "$productType", 18 count: { 19 $sum: 1 20 }, 21 }, 22 }, 23 ])
Expressions give aggregation pipeline stages their ability to manipulate data. They come in four forms:
Operators: expressed as objects with a dollar-sign prefix followed by the name of the operator. In the example above, {$sum : 1} is an example of an operator incrementing the count of orders for each product type by 1 each time a new order for a product type is found.
Field Paths: expressed as strings with a dollar-sign prefix, followed by the field’s path. In the case of embedded objects or arrays, dot-notation can be used to provide the path to the embedded item. In the example above, "$productType" is a field path.
Variables: expressed with a double dollar-sign prefix, variables can be system or user defined. For example, "$$NOW" returns the current datetime value.
Literal Values: In the example above, the literal value ‘1’ in {$sum : 1} can be considered an expression and could be replaced with — for example — a field path expression.
In Java applications using the MongoDB native drivers, aggregation pipelines can be defined and executed by directly building equivalent BSON document objects. Our example pipeline above might look like the following when being built in Java using this approach:
1 … 2 MongoDatabase database = mongoClient.getDatabase("Mighty_Products"); 3 MongoCollection<Document> collection = database.getCollection("orders"); 4 5 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); 6 7 Bson matchStage = new Document("$match", 8 new Document("orderDate", 9 new Document("$gte", 10 formatter.parse("2023-01-01"))) 11 .append("orderDate", 12 new Document("$lte", 13 formatter.parse("2023-03-31")))); 14 15 Bson groupStage = new Document("$group", 16 new Document("_id", "$productType") 17 .append("count", 18 new Document("$sum", 1L))); 19 20 collection.aggregate( 21 Arrays.asList( 22 matchStage, 23 groupStage 24 ) 25 ).forEach(doc -> System.out.println(doc.toJson()));
The Java code above is perfectly functional and will execute as intended, but it does highlight a couple of issues:
- When creating the code, we had to understand the format of the corresponding BSON documents. We were not able to utilize IDE features such as code completion and discovery.
- Any mistakes in the formatting of the documents being created, or the parameters and data types being passed to its various operators, would not be identified until we actually try to run the code.
- Although our example above is relatively simple, in more complex pipelines, the level of indentation and nesting required in the corresponding document building code can lead to readability issues.
As an alternative to building BSON document objects, the MongoDB Java driver also defines a set of “builder” classes with static utility methods to simplify the execution of many operations in MongoDB, including the creation and execution of aggregation pipeline stages. Using the builder classes allows developers to discover more errors at compile rather than run time and to use code discovery and completion features in IDEs. Recent versions of the Java driver have additionally added extended support for expression operators when using the aggregation builder classes, allowing pipelines to be written with typesafe methods and using fluent coding patterns.
Using this approach, the above code could be written as:
1 MongoDatabase database = mongoClient.getDatabase("Mighty_Products"); 2 MongoCollection<Document> collection = database.getCollection("orders"); 3 4 var orderDate = current().getDate("orderDate"); 5 Bson matchStage = match(expr(orderDate.gte(of(Instant.parse("2023-01-01"))) 6 .and(orderDate.lte(of(Instant.parse("2023-03-31")))))); 7 8 Bson groupStage = group(current().getString("productType"), sum("count", 1L)); 9 10 collection.aggregate( 11 Arrays.asList( 12 matchStage, 13 groupStage 14 ) 15 ).forEach(doc -> System.out.println(doc.toJson()));
In the rest of this article, we’ll walk through an example aggregation pipeline using the aggregation builder classes and methods and highlight some of the new aggregation expression operator support.
Our aggregation pipeline example is based on a database collecting and analyzing Air Traffic Control data transmitted by aircraft flying in and out of Denver International Airport. The data is collected using a receiver built using a Raspberry Pi and USB Software Defined Radios (SDRs) using software from the rather excellent Stratux open-source project.
These cheap-to-build receivers have become popular with pilots of light aircraft in recent years as it allows them to project the location of nearby aircraft within the map display of tablet and smartphone-based navigation applications such as Foreflight, helping to avoid mid-air collisions.
In our application, the data received from the Stratux receiver is combined with aircraft reference data from the Opensky Network to give us documents that look like this:
1 { 2 "_id": { 3 "$numberLong": "11262117" 4 }, 5 "model": "B737", 6 "tailNum": "N8620H", 7 "positionReports": [ 8 { 9 "callsign": "SWA962", 10 "alt": { 11 "$numberLong": "12625" 12 }, 13 "lat": { 14 "$numberDecimal": "39.782833" 15 }, 16 "lng": { 17 "$numberDecimal": "-104.49988" 18 }, 19 "speed": { 20 "$numberLong": "283" 21 }, 22 "track": { 23 "$numberLong": "345" 24 }, 25 "vvel": { 26 "$numberLong": "-1344" 27 }, 28 "timestamp": { 29 "$date": "2023-01-31T23:28:26.294Z" 30 } 31 }, 32 { 33 "callsign": "SWA962", 34 "alt": { 35 "$numberLong": "12600" 36 }, 37 "lat": { 38 "$numberDecimal": "39.784744" 39 }, 40 "lng": { 41 "$numberDecimal": "-104.50058" 42 }, 43 "speed": { 44 "$numberLong": "283" 45 }, 46 "track": { 47 "$numberLong": "345" 48 }, 49 "vvel": { 50 "$numberLong": "-1344" 51 }, 52 "timestamp": { 53 "$date": "2023-01-31T23:28:26.419Z" 54 } 55 }, 56 { 57 "callsign": "SWA962", 58 "alt": { 59 "$numberLong": "12600" 60 }, 61 "lat": { 62 "$numberDecimal": "39.78511" 63 }, 64 "lng": { 65 "$numberDecimal": "-104.50071" 66 }, 67 "speed": { 68 "$numberLong": "283" 69 }, 70 "track": { 71 "$numberLong": "345" 72 }, 73 "vvel": { 74 "$numberLong": "-1344" 75 }, 76 "timestamp": { 77 "$date": "2023-01-31T23:28:26.955Z" 78 } 79 } 80 ] 81 }
The “tailNum” field provides the unique registration number of the aircraft and doesn’t change between position reports. The position reports are in an array[2], with each entry giving the geographical coordinates of the aircraft, its altitude, speed (horizontal and vertical), heading, and a timestamp. The position reports also give the callsign of the flight the aircraft was operating at the time it broadcast the position report. This can vary if the aircraft’s position reports were picked up as it flew into Denver, and then again later as it flew out of Denver operating a different flight. In the sample above, aircraft N8620H, a Boeing 737, was operating flight SWA962 — a Southwest Airlines flight. It was flying at a speed of 283 knots, on a heading of 345 degrees, descending through 12,600 feet at 1344 ft/minute.
Using data collected over a 36-hour period, our collection contains information on over 500 different aircraft and over half a million position reports. We want to build an aggregation pipeline that will show the number of different aircraft operated by United Airlines grouped by aircraft type.
The aggregation pipeline that we will run on our data will consist of three stages:
The first — a match stage — will find all aircraft that transmitted a United Airlines callsign between two dates.
Next, we will carry out a group stage that takes the aircraft documents found by the match stage and creates a new set of documents — one for each model of aircraft found during the match stage, with each document containing a list of all the tail numbers of aircraft of that type found during the match stage.
Finally, we carry out a project stage which is used to reshape the data in each document into our final desired format.
A match stage carries out a query to filter the documents being passed to the next stage in the pipeline. A match stage is typically used as one of the first stages in the pipeline in order to keep the number of documents the pipeline has to work with — and therefore its memory footprint — to a reasonable size.
In our pipeline, the match stage will select all aircraft documents containing at least one position report with a United Airlines callsign (United callsigns all start with the three-letter prefix “UAL”), and with a timestamp between falling within a selected date range. The BSON representation of the resulting pipeline stage looks like:
1 { 2 $match: { 3 positionReports: { 4 $elemMatch: { 5 callsign: /^UAL/, 6 $and: [ 7 { 8 timestamp: { 9 $gte: ISODate( 10 "2023-01-31T12:00:00.000-07:00" 11 ) 12 } 13 }, 14 { 15 timestamp: { 16 $lt: ISODate( 17 "2023-02-01T00:00:00.000-07:00" 18 ) 19 } 20 } 21 ] 22 } 23 } 24 } 25 }
The $elemMatch operator specifies that the query criteria we provide must all occur within a single entry in an array to generate a match, so an aircraft document will only match if it contains at least one position report where the callsign starts with “UAL” and the timestamp is between 12:00 on January 31st and 00:00 on February 1st in the Mountain time zone.
In Java, after using either Maven or Gradle to add the MongoDB Java drivers as a dependency within our project, we could define this stage by building an equivalent BSON document object:
1 //Create the from and to dates for the match stage 2 String sFromDate = "2023-01-31T12:00:00.000-07:00"; 3 TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(sFromDate); 4 Instant fromInstant = Instant.from(ta); 5 Date fromDate = Date.from(fromInstant); 6 7 String sToDate = "2023-02-01T00:00:00.000-07:00"; 8 ta = DateTimeFormatter.ISO_INSTANT.parse(sToDate); 9 Instant toInstant = Instant.from(ta); 10 Date toDate = Date.from(toInstant); 11 12 Document matchStage = new Document("$match", 13 new Document("positionReports", 14 new Document("$elemMatch", 15 new Document("callsign", Pattern.compile("^UAL")) 16 .append("$and", Arrays.asList( 17 new Document("timestamp", new Document("$gte", fromDate)), 18 new Document("timestamp", new Document("$lt", toDate)) 19 )) 20 ) 21 ) 22 );
As we saw with the earlier online store example, whilst this code is perfectly functional, we did need to understand the structure of the corresponding BSON document, and any mistakes we made in constructing it would only be discovered at run-time.
As an alternative, after adding the necessary import statements to give our code access to the aggregation builder and expression operator static methods, we can build an equivalent pipeline stage with the following code:
1 import static com.mongodb.client.model.Aggregates.*; 2 import static com.mongodb.client.model.Filters.*; 3 import static com.mongodb.client.model.Projections.*; 4 import static com.mongodb.client.model.Accumulators.*; 5 import static com.mongodb.client.model.mql.MqlValues.*; 6 //... 7 8 //Create the from and to dates for the match stage 9 String sFromDate = "2023-01-31T12:00:00.000-07:00"; 10 TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(sFromDate); 11 Instant fromInstant = Instant.from(ta); 12 13 String sToDate = "2023-02-01T00:00:00.000-07:00"; 14 ta = DateTimeFormatter.ISO_INSTANT.parse(sToDate); 15 Instant toInstant = Instant.from(ta); 16 17 var positionReports = current().<MqlDocument>getArray("positionReports"); 18 Bson matchStage = match(expr( 19 positionReports.any(positionReport -> { 20 var callsign = positionReport.getString("callsign"); 21 var ts = positionReport.getDate("timestamp"); 22 return callsign 23 .substr(0,3) 24 .eq(of("UAL")) 25 .and(ts.gte(of(fromInstant))) 26 .and(ts.lt(of(toInstant))); 27 }) 28 ));
There’s a couple of things worth noting in this code:
Firstly, the expressions operators framework gives us access to a method current() which returns the document currently being processed by the aggregation pipeline. We use it initially to get the array of position reports from the current document.
Next, although we’re using the match() aggregation builder method to create our match stage, to better demonstrate the use of the expression operators framework and its associated coding style, we’ve used the expr()[3] filter builder method to build an expression that uses the any() array expression operator to iterate through each entry in the positionReports array, looking for any that matches our predicate — i.e., that has a callsign field starting with the letters “UAL” and a timestamp falling within our specified date/time range. This is equivalent to what the $elemMatch operator in our original BSON document-based pipeline stage was doing.
Also, when using the expression operators to retrieve fields, we’ve used type-specific methods to indicate the type of the expected return value. callsign was retrieved using getString(), while the timestamp variable ts was retrieved using getDate(). This allows IDEs such as IntelliJ and Visual Studio Code to perform type checking, and for subsequent code completion to be tailored to only show methods and documentation relevant to the returned type. This can lead to faster and less error-prone coding.
Finally, note that in building the predicate for the any() expression operator, we’ve used a fluent coding style and idiosyncratic coding elements, such as lambdas, that many Java developers will be familiar with and more comfortable using rather than the MongoDB-specific approach needed to directly build BSON documents.
Having filtered our document list to only include aircraft operated by United Airlines in our match stage, in the second stage of the pipeline, we carry out a group operation to begin the task of counting the number of aircraft of each model. The BSON document for this stage looks like:
1 { 2 $group: 3 { 4 _id: "$model", 5 aircraftSet: { 6 $addToSet: "$tailNum", 7 }, 8 }, 9 }
In this stage, we are specifying that we want to group the document data by the “model” field and that in each resulting document, we want an array called “aircraftSet” containing each unique tail number of observed aircraft of that model type. The documents output from this stage look like:
1 { 2 "_id": "B757", 3 "aircraftSet": [ 4 "N74856", 5 "N77865", 6 "N17104", 7 "N19117", 8 "N14120", 9 "N57855", 10 "N77871" 11 ] 12 }
The corresponding Java code for the stage looks like:
1 Bson bGroupStage = group(current().getString("model"), 2 addToSet("aircraftSet", current().getString("tailNum")));
As before, we’ve used the expressions framework current() method to access the document currently being processed by the pipeline. The aggregation builders addToSet() accumulator method is used to ensure only unique tail numbers are added to the “aircraftSet” array.
- Rename the “_id” field introduced by the group stage back to “model.”
- Swap the array of tail numbers for the number of entries in the array.
- Add a new field, “airline,” populating it with the literal value “United.”
- Add a field named “manufacturer” and use a $cond conditional operator to populate it with:
- “AIRBUS” if the aircraft model starts with “A.”
- “BOEING” if it starts with a “B.”
- “CANADAIR” if it starts with a “C.”
- “EMBRAER” if it starts with an “E.”
- “MCDONNELL DOUGLAS” if it starts with an “M.”
- “UNKNOWN” in all other cases.
The BSON document for this stage looks like:
1 { 2 $project: { 3 airline: "United", 4 model: "$_id", 5 count: { 6 $size: "$aircraftSet", 7 }, 8 manufacturer: { 9 $let: { 10 vars: { 11 manufacturerPrefix: { 12 $substrBytes: ["$_id", 0, 1], 13 }, 14 }, 15 in: { 16 $switch: { 17 branches: [ 18 { 19 case: { 20 $eq: [ 21 "$$manufacturerPrefix", 22 "A", 23 ], 24 }, 25 then: "AIRBUS", 26 }, 27 { 28 case: { 29 $eq: [ 30 "$$manufacturerPrefix", 31 "B", 32 ], 33 }, 34 then: "BOEING", 35 }, 36 { 37 case: { 38 $eq: [ 39 "$$manufacturerPrefix", 40 "C", 41 ], 42 }, 43 then: "CANADAIR", 44 }, 45 { 46 case: { 47 $eq: [ 48 "$$manufacturerPrefix", 49 "E", 50 ], 51 }, 52 then: "EMBRAER", 53 }, 54 { 55 case: { 56 $eq: [ 57 "$$manufacturerPrefix", 58 "M", 59 ], 60 }, 61 then: "MCDONNELL DOUGLAS", 62 }, 63 ], 64 default: "UNKNOWN", 65 }, 66 }, 67 }, 68 }, 69 _id: "$$REMOVE", 70 }, 71 }
The resulting output documents look like:
1 { 2 "airline": "United", 3 "model": "B777", 4 "count": 5, 5 "Manufacturer": "BOEING" 6 }
The Java code for this stage looks like:
1 Bson bProjectStage = project(fields( 2 computed("airline", "United"), 3 computed("model", current().getString("_id")), 4 computed("count", current().<MqlDocument>getArray("aircraftSet").size()), 5 computed("manufacturer", current() 6 .getString("_id") 7 .substr(0, 1) 8 .switchStringOn(s -> s 9 .eq(of("A"), (m -> of("AIRBUS"))) 10 .eq(of("B"), (m -> of("BOEING"))) 11 .eq(of("C"), (m -> of("CANADAIR"))) 12 .eq(of("E"), (m -> of("EMBRAER"))) 13 .eq(of("M"), (m -> of("MCDONNELL DOUGLAS"))) 14 .defaults(m -> of("UNKNOWN")) 15 )), 16 excludeId() 17 ));
Note again the use of type-specific field accessor methods to get the aircraft model type (string) and aircraftSet (array of type MqlDocument). In determining the aircraft manufacturer, we’ve again used a fluent coding style to conditionally set the value to Boeing or Airbus.
With our three pipeline stages now defined, we can now run the pipeline against our collection:
1 aircraftCollection.aggregate( 2 Arrays.asList( 3 matchStage, 4 groupStage, 5 projectStage 6 ) 7 ).forEach(doc -> System.out.println(doc.toJson()));
If all goes to plan, this should produce output to the console that look like:
1 {"airline": "United", "model": "B757", "count": 7, "manufacturer": "BOEING"} 2 {"airline": "United", "model": "B777", "count": 5, "manufacturer": "BOEING"} 3 {"airline": "United", "model": "A320", "count": 21, "manufacturer": "AIRBUS"} 4 {"airline": "United", "model": "B737", "count": 45, "manufacturer": "BOEING"}
In this article, we shown examples of how expression operators and aggregation builder methods in the latest versions of the MongoDB Java drivers can be used to construct aggregation pipelines using a fluent, idiosyncratic style of Java programming that can utilize autocomplete functionality in IDEs and type-safety compiler features. This can result in code that is more robust and more familiar in style to many Java developers. The use of the builder classes also places less dependence on developers having an extensive understanding of the BSON document format for aggregation pipeline stages.
More information on the use of aggregation builder and expression operator classes can be found in the official MongoDB Java Driver documentation.
The example Java code, aggregation pipeline BSON, and a JSON export of the data used in this article can be found in Github.
More information
[1] MongoDB uses Binary JSON (BSON) to store data and define operations. BSON is a superset of JSON, stored in binary format and allowing data types over and above those defined in the JSON standard. Get more information on BSON.
[2] It should be noted that storing the position reports in an array for each aircraft like this works well for purposes of our example, but it’s probably not the best design for a production grade system as — over time — the arrays for some aircraft could become excessively large. A really good discussion of massive arrays and other anti patterns, and how to handle them, is available over at Developer Center.
[3] The use of expressions in Aggregation Pipeline Match stages can sometimes cause some confusion. For a discussion of this, and aggregations in general, Paul Done’s excellent eBook, “Practical MongoDB Aggregations,” is highly recommended.