How to monitor updates to a specific nested document in MongoDB using Change Streams?

I am using MongoDB version 6.0.

I have a MongoDB document structured as follows:

{
  "_id": "123456",
  "info": {
    "area_name": "foo",
    "type_name": "bar",
    "year": 2024
    // other fields
  },
  "state": 1
  // other fields
}

The info field is a nested document, and I’ve omitted several fields in the example structure above.

I want to use change streams to monitor updates to all fields within the info nested document, while ignoring changes to other fields such as state. How can I achieve this?

I am developing with Java and Spring Data MongoDB.

Here are my attempts so far:

final var options = ChangeStreamOptions.builder()
                                       .fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED)
                                       .filter(Aggregation.newAggregation(
                                               Aggregation.match(
                                                       where("updateDescription.updatedFields.info").exists(true)
                                               )
                                       ))
                                       .build();
reactiveMongoTemplate.changeStream("collection_name", options, Foo.class)
                     .subscribe(this::doSomething);

For example, I modified the info.year field, but it didn’t seem to work.

The reason appears to be that the value of the updatedFields field in ChangeStreamEvent is info.year instead of info, so the filter condition I added in ChangeStreamOptions didn’t work.

Hi @Charles and welcome to the MongoDB community forum.

Based on the sample data inserted above, I have the documents in the collection as:

Atlas atlas-99o4e9-shard-0 [primary] test> db.changeStreams.find()
[
  {
    _id: '123456',
    info: {
      area_name: 'foo',
      type_name: 'bar',
      year: 2024,
      description: 'Sample area foo with type bar'
    },
    state: 1
  },
  {
    _id: '789012',
    info: {
      area_name: 'baz',
      type_name: 'qux',
      year: 2025,
      description: 'Sample area baz with type qux'
    },
    state: 2
  },
  {
    _id: '345678',
    info: {
      area_name: 'quux',
      type_name: 'corge',
      year: 2026,
      description: 'Sample area quux with type corge'
    },
    state: 3
  }
]

and used the function as:

@Override
    public void run(String... args) throws Exception {
        logger.info("Starting ChangeStreamListener...");

        MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("changeStreams");

        collection.watch(Arrays.asList(
                        Aggregates.match(
                                Filters.and(
                                        Filters.eq("operationType", "update"),
                                        Filters.exists("updateDescription.updatedFields.info")
                                )
                        ),
                        Aggregates.project(new Document("fullDocument.info", 1))
                ))
                .fullDocument(FullDocument.UPDATE_LOOKUP)
                .forEach((ChangeStreamDocument<Document> changeStreamDocument) -> {
                    logger.info("Change detected: {}", changeStreamDocument);

                    Document fullDocument = changeStreamDocument.getFullDocument();
                    if (fullDocument != null) {
                        Document info = (Document) fullDocument.get("info");
                        if (info != null) {
                            logger.info("Detected change in 'info': {}", info.toJson());
                        } else {
                            logger.warn("No 'info' field found in the document");
                        }
                    } else {
                        logger.warn("No full document found in the change stream event");
                    }
                });
    }
}

which gave me the output as:

2024-06-20T17:09:14.510+05:30 INFO 77851 --- [ main] o.m.listener.ChangeStreamListener : Change detected: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "82667414E2000000012B042C0100296E5A10046714BBF223D349038E212A7EDA0FC474463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C31323334353600000004"}, namespace=test.changeStreams, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": "123456"}, clusterTime=Timestamp{value=7382548650062249985, seconds=1718883554, inc=1}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"info.area_name": "new"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{value=1718883554512}}

and I can see the change stream observing the change inside the subdocuments.

Can you try the above sample code and let us know if you still facing the issue.

Best regards
Aasawari

Hi @Aasawari

Thank you for your response and the provided code.

I attempted to run the code with the same document and modified the info.name field. Unfortunately, I did not observe the corresponding change stream.

When I removed the line Filters.exists("updateDescription.updatedFields.info") and ran it again, I could observe the change stream. The logs were as follows:

2024-06-23 00:15:26.854  INFO 90541 --- [           main] c.i.m.s.impl.ChangeStreamTestService     : Change detected: ChangeStreamDocument{ operationType=null, resumeToken={"_data": "826676F89E000000162B042C0100296E5A1004FC26F660C34C45348AAFB9D84F4EDE0E463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C31323334353600000004"}, namespace=null, destinationNamespace=null, fullDocument=Document{{info=Document{{area_name=new, type_name=bar, year=2024, description=Sample area foo with type bar}}}}, fullDocumentBeforeChange=null, documentKey=null, clusterTime=null, updateDescription=null, txnNumber=null, lsid=null, wallTime=null}
2024-06-23 00:15:26.855  INFO 90541 --- [           main] c.i.m.s.impl.ChangeStreamTestService     : Detected change in 'info': {"area_name": "new", "type_name": "bar", "year": 2024, "description": "Sample area foo with type bar"}

Additionally, I noticed that your provided code includes Aggregates.project(new Document("fullDocument.info", 1)). This seems to mean that only the info field is printed. When I removed this line of code and ran it again, the output was as follows:

2024-06-23 00:16:00.000  INFO 90554 --- [           main] c.i.m.s.impl.ChangeStreamTestService     : Change detected: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "826676F8BF000000132B042C0100296E5A1004FC26F660C34C45348AAFB9D84F4EDE0E463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C31323334353600000004"}, namespace=sample_mflix.testcs, destinationNamespace=null, fullDocument=Document{{_id=123456, info=Document{{area_name=foo, type_name=bar, year=2024, description=Sample area foo with type bar}}, state=1}}, fullDocumentBeforeChange=null, documentKey={"_id": "123456"}, clusterTime=Timestamp{value=7383362138342948883, seconds=1719072959, inc=19}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"info.area_name": "foo"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1719072959979}}
2024-06-23 00:16:00.008  INFO 90554 --- [           main] c.i.m.s.impl.ChangeStreamTestService     : Detected change in 'info': {"area_name": "foo", "type_name": "bar", "year": 2024, "description": "Sample area foo with type bar"}

I’m not sure what causes this difference; perhaps more details are needed.

I am developing with Spring Boot and Spring Data MongoDB, using driver version 4.9.1. The MongoDB cluster is running on Atlas, version 7.0.

Finally, thank you again for your response.