Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
Java
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Languageschevron-right
Javachevron-right

Java - Change Streams

Maxime Beugnet10 min read • Published Feb 01, 2022 • Updated Oct 01, 2024
MongoDBChange StreamsJava
Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty

Updates

The MongoDB Java quickstart repository is available on GitHub.

February 28th, 2024

  • Update to Java 21
  • Update Java Driver to 5.0.0
  • Update logback-classic to 1.2.13

November 14th, 2023

  • Update to Java 17
  • Update Java Driver to 4.11.1
  • Update mongodb-crypt to 1.8.0

March 25th, 2021

  • Update Java Driver to 4.2.2.
  • Added Client Side Field Level Encryption example.

October 21st, 2020

  • Update Java Driver to 4.1.1.
  • The Java Driver logging is now enabled via the popular SLF4J API, so I added logback in the pom.xml and a configuration file logback.xml.

Introduction

Java badge
Change Streams were introduced in MongoDB 3.6. They allow applications to access real-time data changes without the complexity and risk of tailing the oplog.
Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, an application can also filter for specific changes or transform the notifications at will.
In this blog post, as promised in the first blog post of this series, I will show you how to leverage MongoDB Change Streams using Java.

Getting Set Up

I will use the same repository as usual in this series. If you don't have a copy of it yet, you can clone it or just update it if you already have it:
1git clone https://github.com/mongodb-developer/java-quick-start
If you haven't yet set up your free cluster on MongoDB Atlas, now is a great time to do so. You have all the instructions in this blog post.

Change Streams

In this blog post, I will be working on the file called ChangeStreams.java, but Change Streams are super easy to work with.
I will show you 5 different examples to showcase some features of the Change Streams. For the sake of simplicity, I will only show you the pieces of code related to the Change Streams directly. You can find the entire code sample at the bottom of this blog post or in the Github repository.
For each example, you will need to start 2 Java programs in the correct order if you want to reproduce my examples.
  • The first program is always the one that contains the Change Streams code.
  • The second one will be one of the Java programs we already used in this Java blog posts series. You can find them in the Github repository. They will generate MongoDB operations that we will observe in the Change Streams output.

A simple Change Streams without filters

Let's start with the most simple Change Stream we can make:
1MongoCollection<Grade> grades = db.getCollection("grades", Grade.class);
2ChangeStreamIterable<Grade> changeStream = grades.watch();
3changeStream.forEach((Consumer<ChangeStreamDocument<Grade>>) System.out::println);
As you can see, all we need is myCollection.watch()! That's it.
This returns a ChangeStreamIterable which, as indicated by its name, can be iterated to return our change events. Here, I'm iterating over my Change Stream to print my change event documents in the Java standard output.
I can also simplify this code like this:
1grades.watch().forEach(printEvent());
2
3private static Consumer<ChangeStreamDocument<Grade>> printEvent() {
4 return System.out::println;
5}
I will reuse this functional interface in my following examples to ease the reading.
To run this example:
  • Uncomment only the example 1 from the ChangeStreams.java file and start it in your IDE or a dedicated console using Maven in the root of your project.
1mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.ChangeStreams" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
  • Start MappingPOJO.java in another console or in your IDE.
1mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.MappingPOJO" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
In MappingPOJO, we are doing 4 MongoDB operations:
  • I'm creating a new Grade document with the insertOne() method,
  • I'm searching for this Grade document using the find() method,
  • I'm replacing entirely this Grade using the findOneAndReplace() method,
  • and finally, I'm deleting this Grade using the deleteOne() method.
This is confirmed in the standard output from MappingJava:
1Grade inserted.
2Grade found: Grade{id=5e2b4a28c9e9d55e3d7dbacf, student_id=10003.0, class_id=10.0, scores=[Score{type='homework', score=50.0}]}
3Grade replaced: Grade{id=5e2b4a28c9e9d55e3d7dbacf, student_id=10003.0, class_id=10.0, scores=[Score{type='homework', score=50.0}, Score{type='exam', score=42.0}]}
4Grade deleted: AcknowledgedDeleteResult{deletedCount=1}
Let's check what we have in the standard output from ChangeStreams.java (prettified):
1ChangeStreamDocument{
2 operationType=OperationType{ value='insert' },
3 resumeToken={ "_data":"825E2F3E40000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F3E400C47CF19D59361620004" },
4 namespace=sample_training.grades,
5 destinationNamespace=null,
6 fullDocument=Grade{
7 id=5e2f3e400c47cf19d5936162,
8 student_id=10003.0,
9 class_id=10.0,
10 scores=[ Score { type='homework', score=50.0 } ]
11 },
12 documentKey={ "_id":{ "$oid":"5e2f3e400c47cf19d5936162" } },
13 clusterTime=Timestamp{
14 value=6786711608069455873,
15 seconds=1580154432,
16 inc=1
17 },
18 updateDescription=null,
19 txnNumber=null,
20 lsid=null
21}
22ChangeStreamDocument{ operationType=OperationType{ value= 'replace' },
23 resumeToken={ "_data":"825E2F3E40000000032B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F3E400C47CF19D59361620004" },
24 namespace=sample_training.grades,
25 destinationNamespace=null,
26 fullDocument=Grade{
27 id=5e2f3e400c47cf19d5936162,
28 student_id=10003.0,
29 class_id=10.0,
30 scores=[ Score{ type='homework', score=50.0 }, Score{ type='exam', score=42.0 } ]
31 },
32 documentKey={ "_id":{ "$oid":"5e2f3e400c47cf19d5936162" } },
33 clusterTime=Timestamp{
34 value=6786711608069455875,
35 seconds=1580154432,
36 inc=3
37 },
38 updateDescription=null,
39 txnNumber=null,
40 lsid=null
41}
42ChangeStreamDocument{
43 operationType=OperationType{ value='delete' },
44 resumeToken={ "_data":"825E2F3E40000000042B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F3E400C47CF19D59361620004" },
45 namespace=sample_training.grades,
46 destinationNamespace=null,
47 fullDocument=null,
48 documentKey={ "_id":{ "$oid":"5e2f3e400c47cf19d5936162" } },
49 clusterTime=Timestamp{
50 value=6786711608069455876,
51 seconds=1580154432,
52 inc=4
53 },
54 updateDescription=null,
55 txnNumber=null,
56 lsid=null
57}
As you can see, only 3 operations appear in the Change Stream:
  • insert,
  • replace,
  • delete.
It was expected because the find() operation is just a reading document from MongoDB. It's not changing anything thus not generating an event in the Change Stream.
Now that we are done with the basic example, let's explore some features of the Change Streams.
Terminate the Change Stream program we started earlier and let's move on.

A simple Change Stream filtering on the operation type

Now let's do the same thing but let's imagine that we are only interested in insert and delete operations.
1List<Bson> pipeline = List.of(match(in("operationType", List.of("insert", "delete"))));
2grades.watch(pipeline).forEach(printEvent());
As you can see here, I'm using the aggregation pipeline feature of Change Streams to filter down the change events I want to process.
Uncomment the example 2 in ChangeStreams.java and execute the program followed by MappingPOJO.java, just like we did earlier.
Here are the change events I'm receiving.
1ChangeStreamDocument {operationType=OperationType {value= 'insert'},
2 resumeToken= {"_data": "825E2F4983000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F4983CC1D2842BFF555640004"},
3 namespace=sample_training.grades,
4 destinationNamespace=null,
5 fullDocument=Grade
6 {
7 id=5e2f4983cc1d2842bff55564,
8 student_id=10003.0,
9 class_id=10.0,
10 scores= [ Score {type= 'homework', score=50.0}]
11 },
12 documentKey= {"_id": {"$oid": "5e2f4983cc1d2842bff55564" }},
13 clusterTime=Timestamp {value=6786723990460170241, seconds=1580157315, inc=1 },
14 updateDescription=null,
15 txnNumber=null,
16 lsid=null
17}
18
19ChangeStreamDocument { operationType=OperationType {value= 'delete'},
20 resumeToken= {"_data": "825E2F4983000000042B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F4983CC1D2842BFF555640004"},
21 namespace=sample_training.grades,
22 destinationNamespace=null,
23 fullDocument=null,
24 documentKey= {"_id": {"$oid": "5e2f4983cc1d2842bff55564"}},
25 clusterTime=Timestamp {value=6786723990460170244, seconds=1580157315, inc=4},
26 updateDescription=null,
27 txnNumber=null,
28 lsid=null
29 }
30]
This time, I'm only getting 2 events insert and delete. The replace event has been filtered out compared to the first example.

Change Stream default behavior with update operations

Same as earlier, I'm filtering my change stream to keep only the update operations this time.
1List<Bson> pipeline = List.of(match(eq("operationType", "update")));
2grades.watch(pipeline).forEach(printEvent());
This time, follow these steps.
  • uncomment the example 3 in ChangeStreams.java,
  • if you never ran Create.java, run it. We are going to use these new documents in the next step.
  • start Update.java in another console.
In your change stream console, you should see 13 update events. Here is the first one:
1ChangeStreamDocument {operationType=OperationType {value= 'update'},
2 resumeToken= {"_data": "825E2FB83E000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"},
3 namespace=sample_training.grades,
4 destinationNamespace=null,
5 fullDocument=null,
6 documentKey= {"_id": {"$oid": "5e27bcce74aa51a0486763fe"}},
7 clusterTime=Timestamp {value=6786845739898109953, seconds=1580185662, inc=1},
8 updateDescription=UpdateDescription {removedFields= [], updatedFields= {"comments.10": "You will learn a lot if you read the MongoDB blog!"}},
9 txnNumber=null,
10 lsid=null
11}
As you can see, we are retrieving our update operation in the updateDescription field, but we are only getting the difference with the previous version of this document.
The fullDocument field is null because, by default, MongoDB only sends the difference to avoid overloading the change stream with potentially useless information.
Let's see how we can change this behavior in the next example.

Change Stream with "Update Lookup"

For this part, uncomment the example 4 from ChangeStreams.java and execute the programs as above.
1List<Bson> pipeline = List.of(match(eq("operationType", "update")));
2grades.watch(pipeline).fullDocument(UPDATE_LOOKUP).forEach(printEvent());
I added the option UPDATE_LOOKUP this time, so we can also retrieve the entire document during an update operation.
Let's see again the first update in my change stream:
1ChangeStreamDocument {operationType=OperationType {value= 'update'},
2 resumeToken= {"_data": "825E2FBBC1000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"},
3 namespace=sample_training.grades,
4 destinationNamespace=null,
5 fullDocument=Grade
6 {
7 id=5e27bcce74aa51a0486763fe,
8 student_id=10002.0,
9 class_id=10.0,
10 scores=null
11 },
12 documentKey= {"_id": {"$oid": "5e27bcce74aa51a0486763fe" }},
13 clusterTime=Timestamp {value=6786849601073709057, seconds=1580186561, inc=1 },
14 updateDescription=UpdateDescription {removedFields= [], updatedFields= {"comments.11": "You will learn a lot if you read the MongoDB blog!"}},
15 txnNumber=null,
16 lsid=null
17}
Note: The Update.java program updates a made-up field "comments" that doesn't exist in my POJO Grade which represents the original schema for this collection. Thus, the field doesn't appear in the output as it's not mapped.
If I want to see this comments field, I can use a MongoCollection not mapped automatically to my Grade.java POJO.
1MongoCollection<Document> grades = db.getCollection("grades");
2List<Bson> pipeline = List.of(match(eq("operationType", "update")));
3grades.watch(pipeline).fullDocument(UPDATE_LOOKUP).forEach((Consumer<ChangeStreamDocument<Document>>) System.out::println);
Then this is what I get in my change stream:
1ChangeStreamDocument {operationType=OperationType {value= 'update'},
2 resumeToken= {"_data": "825E2FBD89000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"},
3 namespace=sample_training.grades,
4 destinationNamespace=null,
5 fullDocument=Document {
6 {
7 _id=5e27bcce74aa51a0486763fe,
8 class_id=10.0,
9 student_id=10002.0,
10 comments= [ You will learn a lot if you read the MongoDB blog!, [...], You will learn a lot if you read the MongoDB blog!]
11 }
12 },
13 documentKey= {"_id": {"$oid": "5e27bcce74aa51a0486763fe"}},
14 clusterTime=Timestamp {value=6786851559578796033, seconds=1580187017, inc=1},
15 updateDescription=UpdateDescription {removedFields= [], updatedFields= {"comments.13": "You will learn a lot if you read the MongoDB blog!"}},
16 txnNumber=null,
17 lsid=null
18}
I have shortened the comments field to keep it readable but it contains 14 times the same comment in my case.
The full document we are retrieving here during our update operation is the document after the update has occurred. Read more about this in our documentation.

Change Streams are resumable

In this final example 5, I have simulated an error and I'm restarting my Change Stream from a resumeToken I got from a previous operation in my Change Stream.
It's important to note that a change stream will resume itself automatically in the face of an "incident". Generally, the only reason that an application needs to restart the change stream manually from a resume token is if there is an incident in the application itself rather than the change stream (e.g. an operator has decided that the application needs to be restarted).
1private static void exampleWithResumeToken(MongoCollection<Grade> grades) {
2 List<Bson> pipeline = List.of(match(eq("operationType", "update")));
3 ChangeStreamIterable<Grade> changeStream = grades.watch(pipeline);
4 MongoChangeStreamCursor<ChangeStreamDocument<Grade>> cursor = changeStream.cursor();
5 System.out.println("==> Going through the stream a first time & record a resumeToken");
6 int indexOfOperationToRestartFrom = 5;
7 int indexOfIncident = 8;
8 int counter = 0;
9 BsonDocument resumeToken = null;
10 while (cursor.hasNext() && counter != indexOfIncident) {
11 ChangeStreamDocument<Grade> event = cursor.next();
12 if (indexOfOperationToRestartFrom == counter) {
13 resumeToken = event.getResumeToken();
14 }
15 System.out.println(event);
16 counter++;
17 }
18 System.out.println("==> Let's imagine something wrong happened and I need to restart my Change Stream.");
19 System.out.println("==> Starting from resumeToken=" + resumeToken);
20 assert resumeToken != null;
21 grades.watch(pipeline).resumeAfter(resumeToken).forEach(printEvent());
22}
For this final example, the same as earlier. Uncomment the part 5 (which is just calling the method above) and start ChangeStreams.java then Update.java.
This is the output you should get:
1==> Going through the stream a first time & record a resumeToken
2ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcce74aa51a0486763fe"}}, clusterTime=Timestamp{value=6786856975532556289, seconds=1580188278, inc=1}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
3ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000022B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBA0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbba"}}, clusterTime=Timestamp{value=6786856975532556290, seconds=1580188278, inc=2}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.15": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
4ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000032B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBB0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbb"}}, clusterTime=Timestamp{value=6786856975532556291, seconds=1580188278, inc=3}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
5ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000042B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBC0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbc"}}, clusterTime=Timestamp{value=6786856975532556292, seconds=1580188278, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
6ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000052B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBD0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbd"}}, clusterTime=Timestamp{value=6786856975532556293, seconds=1580188278, inc=5}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
7ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000062B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBE0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbe"}}, clusterTime=Timestamp{value=6786856975532556294, seconds=1580188278, inc=6}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
8ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000072B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBF0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbf"}}, clusterTime=Timestamp{value=6786856975532556295, seconds=1580188278, inc=7}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
9ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000082B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC00004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc0"}}, clusterTime=Timestamp{value=6786856975532556296, seconds=1580188278, inc=8}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
10==> Let's imagine something wrong happened and I need to restart my Change Stream.
11==> Starting from resumeToken={"_data": "825E2FC276000000062B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBE0004"}
12ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000072B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBF0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbf"}}, clusterTime=Timestamp{value=6786856975532556295, seconds=1580188278, inc=7}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
13ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000082B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC00004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc0"}}, clusterTime=Timestamp{value=6786856975532556296, seconds=1580188278, inc=8}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
14ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000092B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC10004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc1"}}, clusterTime=Timestamp{value=6786856975532556297, seconds=1580188278, inc=9}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
15ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000A2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC20004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc2"}}, clusterTime=Timestamp{value=6786856975532556298, seconds=1580188278, inc=10}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
16ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000B2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC30004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc3"}}, clusterTime=Timestamp{value=6786856975532556299, seconds=1580188278, inc=11}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
17ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000D2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC8F94B5117D894CBB90004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc8f94b5117d894cbb9"}}, clusterTime=Timestamp{value=6786856975532556301, seconds=1580188278, inc=13}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"scores.0.score": 904745.0267635228, "x": 150}}, txnNumber=null, lsid=null}
18ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000F2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBA0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbba"}}, clusterTime=Timestamp{value=6786856975532556303, seconds=1580188278, inc=15}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"scores.0.score": 2126144.0353088505, "x": 150}}, txnNumber=null, lsid=null}
As you can see here, I was able to stop reading my Change Stream and, from the resumeToken I collected earlier, I can start a new Change Stream from this point in time.

Final Code

ChangeStreams.java (code):
1package com.mongodb.quickstart;
2
3import com.mongodb.ConnectionString;
4import com.mongodb.MongoClientSettings;
5import com.mongodb.client.*;
6import com.mongodb.client.model.changestream.ChangeStreamDocument;
7import com.mongodb.quickstart.models.Grade;
8import org.bson.BsonDocument;
9import org.bson.codecs.configuration.CodecRegistry;
10import org.bson.codecs.pojo.PojoCodecProvider;
11import org.bson.conversions.Bson;
12
13import java.util.List;
14import java.util.function.Consumer;
15
16import static com.mongodb.client.model.Aggregates.match;
17import static com.mongodb.client.model.Filters.eq;
18import static com.mongodb.client.model.Filters.in;
19import static com.mongodb.client.model.changestream.FullDocument.UPDATE_LOOKUP;
20import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
21import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
22
23public class ChangeStreams {
24
25 public static void main(String[] args) {
26 ConnectionString connectionString = new ConnectionString(System.getProperty("mongodb.uri"));
27 CodecRegistry pojoCodecRegistry = fromProviders(PojoCodecProvider.builder().automatic(true).build());
28 CodecRegistry codecRegistry = fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), pojoCodecRegistry);
29 MongoClientSettings clientSettings = MongoClientSettings.builder()
30 .applyConnectionString(connectionString)
31 .codecRegistry(codecRegistry)
32 .build();
33
34 try (MongoClient mongoClient = MongoClients.create(clientSettings)) {
35 MongoDatabase db = mongoClient.getDatabase("sample_training");
36 MongoCollection<Grade> grades = db.getCollection("grades", Grade.class);
37 List<Bson> pipeline;
38
39 // Only uncomment one example at a time. Follow instructions for each individually then kill all remaining processes.
40
41 /** => Example 1: print all the write operations.
42 * => Start "ChangeStreams" then "MappingPOJOs" to see some change events.
43 */
44 grades.watch().forEach(printEvent());
45
46 /** => Example 2: print only insert and delete operations.
47 * => Start "ChangeStreams" then "MappingPOJOs" to see some change events.
48 */
49// pipeline = List.of(match(in("operationType", List.of("insert", "delete"))));
50// grades.watch(pipeline).forEach(printEvent());
51
52 /** => Example 3: print only updates without fullDocument.
53 * => Start "ChangeStreams" then "Update" to see some change events (start "Create" before if not done earlier).
54 */
55// pipeline = List.of(match(eq("operationType", "update")));
56// grades.watch(pipeline).forEach(printEvent());
57
58 /** => Example 4: print only updates with fullDocument.
59 * => Start "ChangeStreams" then "Update" to see some change events.
60 */
61// pipeline = List.of(match(eq("operationType", "update")));
62// grades.watch(pipeline).fullDocument(UPDATE_LOOKUP).forEach(printEvent());
63
64 /**
65 * => Example 5: iterating using a cursor and a while loop + remembering a resumeToken then restart the Change Streams.
66 * => Start "ChangeStreams" then "Update" to see some change events.
67 */
68// exampleWithResumeToken(grades);
69 }
70 }
71
72 private static void exampleWithResumeToken(MongoCollection<Grade> grades) {
73 List<Bson> pipeline = List.of(match(eq("operationType", "update")));
74 ChangeStreamIterable<Grade> changeStream = grades.watch(pipeline);
75 MongoChangeStreamCursor<ChangeStreamDocument<Grade>> cursor = changeStream.cursor();
76 System.out.println("==> Going through the stream a first time & record a resumeToken");
77 int indexOfOperationToRestartFrom = 5;
78 int indexOfIncident = 8;
79 int counter = 0;
80 BsonDocument resumeToken = null;
81 while (cursor.hasNext() && counter != indexOfIncident) {
82 ChangeStreamDocument<Grade> event = cursor.next();
83 if (indexOfOperationToRestartFrom == counter) {
84 resumeToken = event.getResumeToken();
85 }
86 System.out.println(event);
87 counter++;
88 }
89 System.out.println("==> Let's imagine something wrong happened and I need to restart my Change Stream.");
90 System.out.println("==> Starting from resumeToken=" + resumeToken);
91 assert resumeToken != null;
92 grades.watch(pipeline).resumeAfter(resumeToken).forEach(printEvent());
93 }
94
95 private static Consumer<ChangeStreamDocument<Grade>> printEvent() {
96 return System.out::println;
97 }
98}
Remember to uncomment only one Change Stream example at a time.

Wrapping Up

Change Streams are very easy to use and setup in MongoDB. They are the key to any real-time processing system.
The only remaining problem here is how to get this in production correctly. Change Streams are basically an infinite loop, processing an infinite stream of events. Multiprocessing is, of course, a must-have for this kind of setup, especially if your processing time is greater than the time separating 2 events.
If you want to learn more and deepen your knowledge faster, I recommend you check out the MongoDB Java Developer Path training available for free on MongoDB University.
In the next blog post, I will show you multi-document ACID transactions in Java.

Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Article

Building a Quarkus Application to Perform MongoDB Vector Search


Oct 07, 2024 | 9 min read
Article

How to Build a Search Service in Java


Apr 23, 2024 | 11 min read
Tutorial

Spring Data Unlocked: Getting Started With Java and MongoDB


Nov 11, 2024 | 5 min read
Quickstart

Introduction to MongoDB and Helidon


Nov 12, 2024 | 6 min read
Table of Contents
  • Updates