We use MongoDB Change streams heavily to detect various critical changes on different collections.
Recently we noticed that after a scheduled maintenance by MongoDB Atlas Team, our application stopped receiving change stream events .
NodeJS Version : v14.18.0
MongoDB NodeJS driver version : ^3.7.3
Please refer to the attached utility code for MongoWatcher which has resuming & error-handling capabilities.
import { delay } from 'lodash';
import { logErrorAndSendEmailAlert } from '../alerts/error-alerts';
import { logger } from '../logger/logger';
/**
* Configuration for the MongoWatcher class.
*/
interface MongoWatcherConfig {
/**
* The MongoDB client object.
*/
client: any;
/**
* The name of the MongoDB collection to watch.
*/
collectionName: string;
/**
* Determines whether to re-watch on error events. Defaults to `true`.
*/
reWatchOnError?: boolean;
/**
* Determines whether to re-watch on end events. Defaults to `true`.
*/
reWatchOnEnd?: boolean;
/**
* Determines whether to re-watch on close events (if `changeStream.close()` was called explicitly). Defaults to `false`.
*/
reWatchOnClose?: boolean;
/**
* Determines whether to use the last resume token when re-watching. Defaults to `true`.
*/
useResumeToken?: boolean;
/**
* An array of aggregation pipeline stages through which to pass change stream documents.
* This allows for filtering and manipulating the change stream documents.
*/
pipeline?: any[];
/**
* Optional event listener callbacks.
*/
listeners?: {
/**
* Callback function to be called on "change" events.
*/
onChange?: (change: any) => void;
/**
* Callback function to be called on "close" events.
*/
onClose?: (data: any) => void;
/**
* Callback function to be called on "error" events.
*/
onError?: (data: any) => void;
/**
* Callback function to be called on "end" events.
*/
onEnd?: (data: any) => void;
};
/**
* Options for the MongoDB change stream.
*/
watchOptions?: any;
}
/**
* Metadata for the MongoWatcher class.
*/
interface MongoWatcherMeta {
/**
* The origin of the registered watcher to be used with logging.
* Defaults to "MongoWatcher-collection-name-here".
*/
origin?: any;
}
// keep track of all change streams
const allChangeStreams: any[] = [];
/**
* @class MongoWatcher
* @classdesc Class representing a wrapper utility around MongoDB change streams
*
* @param {MongoWatcherConfig} config - {@link MongoWatcherConfig | MongoWatcher configuration} object containing different options.
* @param {MongoWatcherMeta} [meta] - {@link MongoWatcherMeta | optional MongoWatcher meta data} - pass object containing anything that needs to be printed with logs
*/
class MongoWatcher {
private config: MongoWatcherConfig;
private meta: MongoWatcherMeta;
private changeStream: any | undefined;
constructor(config: MongoWatcherConfig, meta?: MongoWatcherMeta) {
const defaultConfig: any = {
reWatchOnError: true,
reWatchOnEnd: true,
reWatchOnClose: false,
useResumeToken: true,
pipeline: [],
listeners: {},
watchOptions: {},
};
this.config = { ...defaultConfig, ...config };
this.meta = { origin: `MongoWatcher-${this.config.collectionName}`, ...meta };
this.changeStream = undefined;
}
/**
* Start watching changes via MongoDB Change streams considering provided config
* & with encapsulated rewatch & resuming logic
*/
public watch(isReWatch = false): void {
const { client, listeners = {}, ...restOfTheConfig } = this.config;
const {
collectionName,
pipeline = [],
watchOptions = {},
useResumeToken,
reWatchOnError,
reWatchOnEnd,
reWatchOnClose,
} = restOfTheConfig;
const { onChange, onError, onEnd, onClose } = listeners;
const defaultWatchOptions: any = { fullDocument: 'updateLookup' };
try {
const collectionObj: any = client.db().collection(collectionName);
this.changeStream = collectionObj.watch(pipeline, {
...defaultWatchOptions,
...watchOptions,
});
logger.info(
`๐ ๐ ๐ ๐ ๐ ${this.meta.origin}: Started watching change stream events ๐ ๐ ๐ ๐ ๐`,
{ config: restOfTheConfig, isReWatch, ...this.meta }
);
allChangeStreams.push(this.changeStream);
this.changeStream.on('change', (change) => {
// track resume token if resuming is configured
const resumeToken = change._id;
if (useResumeToken) {
this.config.watchOptions.resumeAfter = resumeToken;
}
logger.info(
`๐ ๐ ๐ ๐ ๐ ${this.meta.origin}: Received new change stream event ๐ ๐ ๐ ๐ ๐ `,
{ resumeToken, ...this.meta }
);
logger.debug(`[${this.meta.origin}]: change event`, { ...this.meta, change });
// call custom callback (if provided)
if (onChange) {
onChange(change);
}
});
this.changeStream.on('error', (data) => {
if (data?.codeName === "ChangeStreamHistoryLost") {
// to avoid getting the same error infinitely, we need to discard resume token
delete this.config.watchOptions.resumeAfter
}
logErrorAndSendEmailAlert({
title: `โ โ Change stream errored! NO ACTION NEEDED - IT WILL BE RESUMED SHORTLY!`,
origin: this.meta.origin,
error: new Error(data?.codeName),
meta: { ...this.meta, config: restOfTheConfig, data },
});
// call custom callback (if provided)
if (onError) {
onError(data);
}
if (reWatchOnError) {
this.reWatch();
}
});
this.changeStream.on('end', (data) => {
logger.warn(
`๐ ๐ ๐ ๐ ๐ ${this.meta.origin}: Change stream ended! ๐ ๐ ๐ ๐ ๐ `,
{ ...this.meta, config: restOfTheConfig, data }
);
// call custom callback (if provided)
if (onEnd) {
onEnd(data);
}
if (reWatchOnEnd) {
this.reWatch();
}
});
this.changeStream.on('close', (data) => {
logger.info(
`๐ ๐ ๐ ๐ ๐ ${this.meta.origin}: Change stream closed! ๐ ๐ ๐ ๐ ๐ `,
{ ...this.meta, config: restOfTheConfig, data }
);
// call custom callback (if provided)
if (onClose) {
onClose(data);
}
if (reWatchOnClose) {
this.reWatch();
}
});
} catch (error) {
logErrorAndSendEmailAlert({
title: `MongoWatcher: Error inside ${this.meta.origin}. Rewatch will be triggered!`,
origin: this.meta.origin,
error,
meta: { meta: this.meta, config: restOfTheConfig },
});
this.reWatch();
}
}
// private method
private reWatch(): void {
if (this.changeStream) {
// cleanup existing watchers & stream
this.changeStream.removeAllListeners();
this.changeStream.close();
}
logger.warn(
`โ โ โ โ โ ${this.meta.origin}: Rewatch will be triggered shortly! โ โ โ โ โ `,
{ ...this.meta }
);
// add some delay for recovery (if connection issue) & then trigger watch
delay(() => {
this.watch(true);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { client, listeners = {}, ...restOfTheConfig } = this.config;
logger.warn(
`โ
โ
โ
โ
โ
${this.meta.origin}: Re-triggered the watcher on detection of absence/closure of stream โ
โ
โ
โ
โ
`,
{ ...this.meta, config: restOfTheConfig }
);
}, 5000);
}
}
/** close all registered change streams : This will be useful for global cleanup (if required) */
function closeAllChangeStreams(): void {
allChangeStreams?.forEach((changeStream) => {
if (changeStream) {
// cleanup existing watchers & stream
changeStream.removeAllListeners();
changeStream.close();
}
});
}
export { MongoWatcher, closeAllChangeStreams };
Using the above utility:
const config = {
client,
collectionName,
pipeline: [
],
listeners: {
onChange: data => {
processCaseChanges(data);
},
},
};
const caseWatcher = new MongoWatcher(config, { from: 'BotDashboard Controller' });
caseWatcher.watch();
Most of the time when we see connection issues the error handler detects the error/closure & accordingly resume the change streams but this time we detected nothing on change streams โฆ it was just silent after maintenance & after 2 days suddenly started working!
Can someone please suggest how we can make change stream integration reliable?