Change streams stopped receiving events after the MongoDB Atlas scheduled maintenance

We use MongoDB Change streams heavily to detect various critical changes on different collections.

Recently we noticed that after a scheduled maintenance by MongoDB Atlas Team, our application stopped receiving change stream events .

NodeJS Version : v14.18.0
MongoDB NodeJS driver version : ^3.7.3

Please refer to the attached utility code for MongoWatcher which has resuming & error-handling capabilities.

import { delay } from 'lodash';
import { logErrorAndSendEmailAlert } from '../alerts/error-alerts';
import { logger } from '../logger/logger';


/**
 * Configuration for the MongoWatcher class.
 */
interface MongoWatcherConfig {
  /**
   * The MongoDB client object.
   */
  client: any;

  /**
   * The name of the MongoDB collection to watch.
   */
  collectionName: string;

  /**
   * Determines whether to re-watch on error events. Defaults to `true`.
   */
  reWatchOnError?: boolean;

  /**
   * Determines whether to re-watch on end events. Defaults to `true`.
   */
  reWatchOnEnd?: boolean;

  /**
   * Determines whether to re-watch on close events (if `changeStream.close()` was called explicitly). Defaults to `false`.
   */
  reWatchOnClose?: boolean;

  /**
   * Determines whether to use the last resume token when re-watching. Defaults to `true`.
   */
  useResumeToken?: boolean;

  /**
   * An array of aggregation pipeline stages through which to pass change stream documents.
   * This allows for filtering and manipulating the change stream documents.
   */
  pipeline?: any[];

  /**
   * Optional event listener callbacks.
   */
  listeners?: {
    /**
     * Callback function to be called on "change" events.
     */
    onChange?: (change: any) => void;

    /**
     * Callback function to be called on "close" events.
     */
    onClose?: (data: any) => void;

    /**
     * Callback function to be called on "error" events.
     */
    onError?: (data: any) => void;

    /**
     * Callback function to be called on "end" events.
     */
    onEnd?: (data: any) => void;
  };

  /**
   * Options for the MongoDB change stream.
   */
  watchOptions?: any;
}

/**
 * Metadata for the MongoWatcher class.
 */
interface MongoWatcherMeta {
  /**
   * The origin of the registered watcher to be used with logging.
   * Defaults to "MongoWatcher-collection-name-here".
   */
  origin?: any;
}

// keep track of all change streams
const allChangeStreams: any[] = [];

/**
 * @class MongoWatcher
 * @classdesc Class representing a wrapper utility around MongoDB change streams
*
 * @param {MongoWatcherConfig} config - {@link MongoWatcherConfig | MongoWatcher configuration} object containing different options.
 * @param {MongoWatcherMeta} [meta] - {@link MongoWatcherMeta | optional MongoWatcher meta data} - pass object containing anything that needs to be printed with logs
 */


class MongoWatcher {
  private config: MongoWatcherConfig;
  private meta: MongoWatcherMeta;
  private changeStream: any | undefined;

  constructor(config: MongoWatcherConfig, meta?: MongoWatcherMeta) {
    const defaultConfig: any = {
      reWatchOnError: true,
      reWatchOnEnd: true,
      reWatchOnClose: false,
      useResumeToken: true,
      pipeline: [],
      listeners: {},
      watchOptions: {},
    };

    this.config = { ...defaultConfig, ...config };
    this.meta = { origin: `MongoWatcher-${this.config.collectionName}`, ...meta };
    this.changeStream = undefined;
  }

  /**
   * Start watching changes via MongoDB Change streams considering provided config
   * & with encapsulated rewatch & resuming logic
   */
  public watch(isReWatch = false): void {
    const { client, listeners = {}, ...restOfTheConfig } = this.config;
    const {
      collectionName,
      pipeline = [],
      watchOptions = {},
      useResumeToken,
      reWatchOnError,
      reWatchOnEnd,
      reWatchOnClose,
    } = restOfTheConfig;

    const { onChange, onError, onEnd, onClose } = listeners;

    const defaultWatchOptions: any = { fullDocument: 'updateLookup' };

    try {
      const collectionObj: any = client.db().collection(collectionName);
      this.changeStream = collectionObj.watch(pipeline, {
        ...defaultWatchOptions,
        ...watchOptions,
      });

      logger.info(
        `๐Ÿš€ ๐Ÿš€ ๐Ÿš€ ๐Ÿš€ ๐Ÿš€ ${this.meta.origin}: Started watching change stream events ๐Ÿš€ ๐Ÿš€ ๐Ÿš€ ๐Ÿš€ ๐Ÿš€`,
        { config: restOfTheConfig, isReWatch, ...this.meta }
      );

      allChangeStreams.push(this.changeStream);

      this.changeStream.on('change', (change) => {
        // track resume token if resuming is configured
        const resumeToken = change._id;
        if (useResumeToken) {
          this.config.watchOptions.resumeAfter = resumeToken;
        }

        logger.info(
          `๐Ÿšš ๐Ÿšš ๐Ÿšš ๐Ÿšš ๐Ÿšš  ${this.meta.origin}: Received new change stream event ๐Ÿšš ๐Ÿšš ๐Ÿšš ๐Ÿšš ๐Ÿšš `,
          { resumeToken, ...this.meta }
        );

        logger.debug(`[${this.meta.origin}]: change event`, { ...this.meta, change });

        // call custom callback (if provided)
        if (onChange) {
          onChange(change);
        }
      });

      this.changeStream.on('error', (data) => {

        if (data?.codeName === "ChangeStreamHistoryLost") {
          // to avoid getting the same error infinitely, we need to discard resume token
          delete this.config.watchOptions.resumeAfter
        }

        logErrorAndSendEmailAlert({
          title: `โŒ โŒ Change stream errored! NO ACTION NEEDED - IT WILL BE RESUMED SHORTLY!`,
          origin: this.meta.origin,
          error: new Error(data?.codeName),
          meta: { ...this.meta, config: restOfTheConfig, data },
        });

        // call custom callback (if provided)
        if (onError) {
          onError(data);
        }

        if (reWatchOnError) {
          this.reWatch();
        }
      });

      this.changeStream.on('end', (data) => {
        logger.warn(
          `๐Ÿ‘‹ ๐Ÿ‘‹ ๐Ÿ‘‹ ๐Ÿ‘‹ ๐Ÿ‘‹  ${this.meta.origin}: Change stream ended! ๐Ÿ‘‹ ๐Ÿ‘‹ ๐Ÿ‘‹ ๐Ÿ‘‹ ๐Ÿ‘‹ `,
          { ...this.meta, config: restOfTheConfig, data }
        );

        // call custom callback (if provided)
        if (onEnd) {
          onEnd(data);
        }

        if (reWatchOnEnd) {
          this.reWatch();
        }
      });

      this.changeStream.on('close', (data) => {
        logger.info(
          `๐Ÿ”Œ ๐Ÿ”Œ ๐Ÿ”Œ ๐Ÿ”Œ ๐Ÿ”Œ  ${this.meta.origin}: Change stream closed! ๐Ÿ”Œ ๐Ÿ”Œ ๐Ÿ”Œ ๐Ÿ”Œ ๐Ÿ”Œ `,
          { ...this.meta, config: restOfTheConfig, data }
        );

        // call custom callback (if provided)
        if (onClose) {
          onClose(data);
        }

        if (reWatchOnClose) {
          this.reWatch();
        }
      });
    } catch (error) {
      logErrorAndSendEmailAlert({
        title: `MongoWatcher: Error inside ${this.meta.origin}. Rewatch will be triggered!`,
        origin: this.meta.origin,
        error,
        meta: { meta: this.meta, config: restOfTheConfig },
      });
      this.reWatch();
    }
  }

  // private method
  private reWatch(): void {
    if (this.changeStream) {
      // cleanup existing watchers & stream
      this.changeStream.removeAllListeners();
      this.changeStream.close();
    }

    logger.warn(
      `โŒ› โŒ› โŒ› โŒ› โŒ› ${this.meta.origin}: Rewatch will be triggered shortly! โŒ› โŒ› โŒ› โŒ› โŒ› `,
      { ...this.meta }
    );

    // add some delay for recovery (if connection issue) & then trigger watch
    delay(() => {
      this.watch(true);

      // eslint-disable-next-line @typescript-eslint/no-unused-vars
      const { client, listeners = {}, ...restOfTheConfig } = this.config;

      logger.warn(
        `โœ… โœ… โœ… โœ… โœ… ${this.meta.origin}: Re-triggered the watcher on detection of absence/closure of stream โœ… โœ… โœ… โœ… โœ…`,
        { ...this.meta, config: restOfTheConfig }
      );
    }, 5000);
  }
}

/** close all registered change streams : This will be useful for global cleanup (if required) */
function closeAllChangeStreams(): void {
  allChangeStreams?.forEach((changeStream) => {
    if (changeStream) {
      // cleanup existing watchers & stream
      changeStream.removeAllListeners();
      changeStream.close();
    }
  });
}

export { MongoWatcher, closeAllChangeStreams };

Using the above utility:

const config = {
      client,
      collectionName,
      pipeline: [
       
      ],
      listeners: {
        onChange: data => {
          processCaseChanges(data);
        },
      },
    };

    const caseWatcher = new MongoWatcher(config, { from: 'BotDashboard Controller' });
    caseWatcher.watch();

Most of the time when we see connection issues the error handler detects the error/closure & accordingly resume the change streams but this time we detected nothing on change streams โ€ฆ it was just silent after maintenance & after 2 days suddenly started working!

Can someone please suggest how we can make change stream integration reliable?

Node driver 3.7.3 is 3 years old at this point and has undergone several major revisions (current version is 6.8.0). One recommendation would be to try updating the driver to take advantage of stability and performance improvements introduced over the various major releases.

1 Like