Navigation

Change Streams

New in version 3.6.

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

Availability

Change streams are available for replica sets and sharded clusters:

  • Storage Engine.

    The replica sets and sharded clusters must use the WiredTiger storage engine. Change streams can also be used on deployments that employ MongoDB’s encryption-at-rest feature.

  • Replica Set Protocol Version.

    The replica sets and sharded clusters must use replica set protocol version 1 (pv1).

  • Read Concern “majority” Enablement.

    Starting in MongoDB 4.2, change streams are available regardless of the "majority" read concern support; that is, read concern majority support can be either enabled (default) or disabled to use change streams.

    In MongoDB 4.0 and earlier, change streams are available only if "majority" read concern support is enabled (default).

Watch Collection/Database/Deployment

You can open change streams against:

Target Description
A collection

You can open a change stream cursor for a single collection (except system collections, or any collections in the admin, local, and config databases).

The examples on this page use the MongoDB drivers to open and work with a change stream cursor for a single collection. See also the mongo shell method db.collection.watch().

A database

Starting in MongoDB 4.0, you can open a change stream cursor for a single database (excluding admin, local, and config database) to watch for changes to all its non-system collections.

For the MongoDB driver method, refer to your driver documentation. See also the mongo shell method db.watch().

A deployment

Starting in MongoDB 4.0, you can open a change stream cursor for a deployment (either a replica set or a sharded cluster) to watch for changes to all non-system collections across all databases except for admin, local, and config.

For the MongoDB driver method, refer to your driver documentation. See also the mongo shell method Mongo.watch().

Change Stream Examples

The examples on this page use the MongoDB drivers to illustrate how to open a change stream cursor for a collection and work with the change stream cursor.

Open A Change Stream

    To open a change stream:

    • For a replica set, you can issue the open change stream operation from any of the data-bearing members.
    • For a sharded cluster, you must issue the open change stream operation from the mongos.

    The following example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents. [1]

    The Python examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

    cursor = db.inventory.watch()
    document = next(cursor)
    

    The Java examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

    MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
    ChangeStreamDocument<Document> next = cursor.next();
    

    The Node.js examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

    The following example uses stream to process the change events.

    const collection = db.collection('inventory');
    const changeStream = collection.watch();
    changeStream.on('change', next => {
      // process next document
    });
    

    Alternatively, you can also use iterator to process the change events:

    const changeStreamIterator = collection.watch();
    const next = await changeStreamIterator.next();
    

    The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

    $changeStream = $db->inventory->watch();
    $changeStream->rewind();
    
    $firstChange = $changeStream->current();
    
    $changeStream->next();
    
    $secondChange = $changeStream->current();
    

    The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

    cursor = db.inventory.watch()
    document = await cursor.next()
    

    The C examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

    mongoc_collection_t *collection;
    bson_t *pipeline = bson_new ();
    bson_t opts = BSON_INITIALIZER;
    mongoc_change_stream_t *stream;
    const bson_t *change;
    const bson_t *resume_token;
    bson_error_t error;
    
    collection = mongoc_database_get_collection (db, "inventory");
    stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
    mongoc_change_stream_next (stream, &change);
    if (mongoc_change_stream_error_document (stream, &error, NULL)) {
       MONGOC_ERROR ("%s\n", error.message);
    }
    
    mongoc_change_stream_destroy (stream);
    

    The C# examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

    var cursor = inventory.Watch();
    while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
    var next = cursor.Current.First();
    cursor.Dispose();
    

    The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

    cursor = inventory.watch.to_enum
    next_change = cursor.next
    

    The Go examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

    cs, err := coll.Watch(ctx, mongo.Pipeline{})
    require.NoError(t, err)
    defer cs.Close(ctx)
    
    ok := cs.Next(ctx)
    next := cs.Current
    

    The Swift (Sync) examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

    let inventory = db.collection("inventory")
    let changeStream = try inventory.watch()
    let next = changeStream.next()
    

    The Swift (Async) examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

    let inventory = db.collection("inventory")
    
    // Option 1: retrieve next document via next()
    let next = inventory.watch().flatMap { cursor in
        cursor.next()
    }
    
    // Option 2: register a callback to execute for each document
    let result = inventory.watch().flatMap { cursor in
        cursor.forEach { event in
            // process event
            print(event)
        }
    }
    

    To retrieve the data change event from the cursor, iterate the change stream cursor. For information on the change stream event, see Change Events.

    While the connection to the MongoDB deployment remains open, the cursor remains open until one of the following occurs:

    • The cursor is explicitly closed.
    • An invalidate event occurs.
    • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.

    Note

    The lifecycle of an unclosed cursor is language-dependent.

    [1]Starting in MongoDB 4.0, you can specify a startAtOperationTime to open the cursor at a particular point in time. If the specified starting point is in the past, it must be in the time range of the oplog.

    Modify Change Stream Output

      You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

      pipeline = [
          {'$match': {'fullDocument.username': 'alice'}},
          {'$addFields': {'newField': 'this is an added field!'}}
      ]
      cursor = db.inventory.watch(pipeline=pipeline)
      document = next(cursor)
      

      You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

      MongoClient mongoClient = new MongoClient( new MongoClientURI("mongodb://host1:port1,host2:port2..."));
      
      // Select the MongoDB database and collection to open the change stream against
      
      MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
      
      MongoCollection<Document> collection = db.getCollection("myTargetCollection");
      
      // Create $match pipeline stage.
      List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
          Document.parse("{'fullDocument.username': 'alice'}"),
          Filters.in("operationType", asList("delete")))));
      
      // Create the change stream cursor, passing the pipeline to the
      // collection.watch() method
      
      MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
      

      The pipeline list includes a single $match stage that filters any operations where the username is alice, or operations where the operationType is delete.

      Passing the pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline.

      You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

      The following example uses stream to process the change events.

      const pipeline = [
        { $match: { 'fullDocument.username': 'alice' } },
        { $addFields: { newField: 'this is an added field!' } }
      ];
      
      const collection = db.collection('inventory');
      const changeStream = collection.watch(pipeline);
      changeStream.on('change', next => {
        // process next document
      });
      

      Alternatively, you can also use iterator to process the change events:

      const changeStreamIterator = collection.watch(pipeline);
      const next = await changeStreamIterator.next();
      

      You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

      $pipeline = [
          ['$match' => ['fullDocument.username' => 'alice']],
          ['$addFields' => ['newField' => 'this is an added field!']],
      ];
      $changeStream = $db->inventory->watch($pipeline);
      $changeStream->rewind();
      
      $firstChange = $changeStream->current();
      
      $changeStream->next();
      
      $secondChange = $changeStream->current();
      

      You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

      pipeline = [
          {'$match': {'fullDocument.username': 'alice'}},
          {'$addFields': {'newField': 'this is an added field!'}}
      ]
      cursor = db.inventory.watch(pipeline=pipeline)
      document = await cursor.next()
      

      You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

      pipeline = BCON_NEW ("pipeline",
                           "[",
                           "{",
                           "$match",
                           "{",
                           "fullDocument.username",
                           BCON_UTF8 ("alice"),
                           "}",
                           "}",
                           "{",
                           "$addFields",
                           "{",
                           "newField",
                           BCON_UTF8 ("this is an added field!"),
                           "}",
                           "}",
                           "]");
      
      stream = mongoc_collection_watch (collection, pipeline, &opts);
      mongoc_change_stream_next (stream, &change);
      if (mongoc_change_stream_error_document (stream, &error, NULL)) {
         MONGOC_ERROR ("%s\n", error.message);
      }
      
      mongoc_change_stream_destroy (stream);
      

      You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

      var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
          .Match(change =>
              change.FullDocument["username"] == "alice" ||
              change.OperationType == ChangeStreamOperationType.Delete)
          .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
              "{ $addFields : { newField : 'this is an added field!' } }");
      
      var collection = database.GetCollection<BsonDocument>("inventory");
      using (var cursor = collection.Watch(pipeline))
      {
          while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
          var next = cursor.Current.First();
      }
      

      You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

      You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

      pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
      	bson.A{
      		bson.D{{"fullDocument.username", "alice"}},
      		bson.D{{"operationType", "delete"}}}}},
      }}}
      cs, err := coll.Watch(ctx, pipeline)
      require.NoError(t, err)
      defer cs.Close(ctx)
      
      ok := cs.Next(ctx)
      next := cs.Current
      

      You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

      let pipeline: [BSONDocument] = [
          ["$match": ["fullDocument.username": "alice"]],
          ["$addFields": ["newField": "this is an added field!"]]
      ]
      let inventory = db.collection("inventory")
      let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
      let next = changeStream.next()
      

      You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

      let pipeline: [BSONDocument] = [
          ["$match": ["fullDocument.username": "alice"]],
          ["$addFields": ["newField": "this is an added field!"]]
      ]
      let inventory = db.collection("inventory")
      
      // Option 1: use next() to iterate
      let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
          changeStream.next()
      }
      
      // Option 2: register a callback to execute for each document
      let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
          changeStream.forEach { event in
              // process event
              print(event)
          }
      }
      

      Tip

      The _id field of the change stream event document act as the resume token. Do not use the pipeline to modify or remove the change stream event’s _id field.

      Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event’s _id field.

      See Change Events for more information on the change stream response document format.

      Lookup Full Document for Update Operations

        By default, change streams only return the delta of fields during the update operation. However, you can configure the change stream to return the most current majority-committed version of the updated document.

        To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to the db.collection.watch() method.

        In the example below, all update operations notifications include a full_document field that represents the current version of the document affected by the update operation.

        cursor = db.inventory.watch(full_document='updateLookup')
        document = next(cursor)
        

        To return the most current majority-committed version of the updated document, pass FullDocument.UPDATE_LOOKUP to the db.collection.watch.fullDocument() method.

        In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.

        cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
        next = cursor.next();
        

        To return the most current majority-committed version of the updated document, pass { fullDocument: 'updateLookup' } to the collection.watch() method.

        In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

        The following example uses stream to process the change events.

        const collection = db.collection('inventory');
        const changeStream = collection.watch({ fullDocument: 'updateLookup' });
        changeStream.on('change', next => {
          // process next document
        });
        

        Alternatively, you can also use iterator to process the change events:

        const changeStreamIterator = collection.watch({ fullDocument: 'updateLookup' });
        const next = await changeStreamIterator.next();
        

        To return the most current majority-committed version of the updated document, pass "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" to the watch() method.

        In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

        $changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
        $changeStream->rewind();
        
        $firstChange = $changeStream->current();
        
        $changeStream->next();
        
        $secondChange = $changeStream->current();
        

        To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to the db.collection.watch() method.

        In the example below, all update operations notifications include a `full_document field that represents the current version of the document affected by the update operation.

        cursor = db.inventory.watch(full_document='updateLookup')
        document = await cursor.next()
        

        To return the most current majority-committed version of the updated document, pass the "fullDocument" option with the "updateLookup" value to the mongoc_collection_watch method.

        In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

        BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
        stream = mongoc_collection_watch (collection, pipeline, &opts);
        mongoc_change_stream_next (stream, &change);
        if (mongoc_change_stream_error_document (stream, &error, NULL)) {
           MONGOC_ERROR ("%s\n", error.message);
        }
        
        mongoc_change_stream_destroy (stream);
        

        To return the most current majority-committed version of the updated document, pass "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" to the collection.Watch() method.

        In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.

        var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
        var cursor = inventory.Watch(options);
        while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
        var next = cursor.Current.First();
        cursor.Dispose();
        

        To return the most current majority-committed version of the updated document, pass full_document: 'updateLookup' to the watch() method.

        In the example below, all update operations notifications include a full_document field that represents the current version of the document affected by the update operation.

        cursor = inventory.watch([], full_document: 'updateLookup').to_enum
        next_change = cursor.next
        

        To return the most current majority-committed version of the updated document, SetFullDocument(options.UpdateLookup) change stream option.

        cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
        require.NoError(t, err)
        defer cs.Close(ctx)
        
        ok := cs.Next(ctx)
        next := cs.Current
        

        To return the most current majority-committed version of the updated document, pass options: ChangeStreamOptions(fullDocument: .updateLookup) to the watch() method.

        let inventory = db.collection("inventory")
        let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
        let next = changeStream.next()
        

        To return the most current majority-committed version of the updated document, pass options: ChangeStreamOptions(fullDocument: .updateLookup) to the watch() method.

        let inventory = db.collection("inventory")
        
        // Option 1: use next() to iterate
        let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
            .flatMap { changeStream in
                changeStream.next()
            }
        
        // Option 2: register a callback to execute for each document
        let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
            .flatMap { changeStream in
                changeStream.forEach { event in
                    // process event
                    print(event)
                }
            }
        

        Note

        If there are one or more majority-committed operations that modified the updated document after the update operation but before the lookup, the full document returned may differ significantly from the document at the time of the update operation.

        However, the deltas included in the change stream document always correctly describe the watched collection changes that applied to that change stream event.

        See Change Events for more information on the change stream response document format.

        Resume a Change Stream

        Change streams are resumable by specifying a resume token to either resumeAfter or startAfter when opening the cursor.

        resumeAfter for Change Streams

          You can resume a change stream after a specific event by passing a resume token to resumeAfter when opening the cursor. For the resume token, use the _id value of the change stream event document. See Resume Tokens for more information on the resume token.

          Important

          • The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.
          • You cannot use resumeAfter to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. Starting in MongoDB 4.2, you can use startAfter to start a new change stream after an invalidate event.

          You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.

          resume_token = cursor.resume_token
          cursor = db.inventory.watch(resume_after=resume_token)
          document = next(cursor)
          

          You can use the resumeAfter() method to resume notifications after the operation specified in the resume token. The resumeAfter() method takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

          BsonDocument resumeToken = next.getResumeToken();
          cursor = inventory.watch().resumeAfter(resumeToken).iterator();
          next = cursor.next();
          

          You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

          const collection = db.collection('inventory');
          const changeStream = collection.watch();
          
          let newChangeStream;
          changeStream.once('change', next => {
            const resumeToken = changeStream.resumeToken;
            changeStream.close();
          
            newChangeStream = collection.watch({ resumeAfter: resumeToken });
            newChangeStream.on('change', next => {
              processChange(next);
            });
          });
          

          You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. $resumeToken in the example below.

          $resumeToken = $changeStream->getResumeToken();
          
          if ($resumeToken === null) {
              throw new \Exception('Resume token was not found');
          }
          
          $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
          $changeStream->rewind();
          
          $firstChange = $changeStream->current();
          

          You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.

          resume_token = cursor.resume_token
          cursor = db.inventory.watch(resume_after=resume_token)
          document = await cursor.next()
          

          In the example below, the resumeAfter option is appended to the stream options to recreate the stream after it has been destroyed. Passing the _id to the change stream attempts to resume notifications starting after the operation specified.

          stream = mongoc_collection_watch (collection, pipeline, NULL);
          if (mongoc_change_stream_next (stream, &change)) {
             resume_token = mongoc_change_stream_get_resume_token (stream);
             BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
          
             mongoc_change_stream_destroy (stream);
             stream = mongoc_collection_watch (collection, pipeline, &opts);
             mongoc_change_stream_next (stream, &change);
             mongoc_change_stream_destroy (stream);
          } else {
             if (mongoc_change_stream_error_document (stream, &error, NULL)) {
                MONGOC_ERROR ("%s\n", error.message);
             }
          
             mongoc_change_stream_destroy (stream);
          }
          

          In the example below, the resumeToken is retrieved from the last change stream document and passed to the Watch() method as an option. Passing the resumeToken to the Watch() method directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.

            var resumeToken = previousCursor.GetResumeToken();
            var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
            var cursor = inventory.Watch(options);
            cursor.MoveNext();
            var next = cursor.Current.First();
            cursor.Dispose();
          

          You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.

            change_stream = inventory.watch
            cursor = change_stream.to_enum
            next_change = cursor.next
            resume_token = change_stream.resume_token
          
            new_cursor = inventory.watch([], resume_after: resume_token).to_enum
            resumed_change = new_cursor.next
          

          You can use ChangeStreamOptions.SetResumeAfter to specify the resume token for the change stream. If the resumeAfter option is set, the change stream resumes notifications after the operation specified in the resume token. The SetResumeAfter takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

          resumeToken := original.ResumeToken()
          
          cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
          require.NoError(t, err)
          defer cs.Close(ctx)
          
          ok = cs.Next(ctx)
          result := cs.Current
          

          You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

          let inventory = db.collection("inventory")
          let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
          let next = changeStream.next()
          
          let resumeToken = changeStream.resumeToken
          let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
          let nextAfterResume = resumedChangeStream.next()
          

          You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

          let inventory = db.collection("inventory")
          
          inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
              .flatMap { changeStream in
                  changeStream.next().map { _ in
                      changeStream.resumeToken
                  }.always { _ in
                      _ = changeStream.kill()
                  }
              }.flatMap { resumeToken in
                  inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
                      newStream.forEach { event in
                          // process event
                          print(event)
                      }
                  }
              }
          

          startAfter for Change Streams

          New in version 4.2.

          You can start a new change stream after a specific event by passing a resume token to startAfter when opening the cursor. Unlike resumeAfter, startAfter can resume notifications after an invalidate event by creating a new change stream. For the resume token, use the _id value of the change stream event document. See Resume Tokens for more information on the resume token.

          Important

          • The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.

          Resume Tokens

          The _id value of a change stream event document acts as the resume token:

          {
             "_data" : <BinData|hex string>
          }
          

          The resume token _data type depends on the MongoDB versions and, in some cases, the feature compatibility version (fcv) at the time of the change stream’s opening/resumption (i.e. a change in fcv value does not affect the resume tokens for already opened change streams):

          MongoDB Version Feature Compatibility Version Resume Token _data Type
          MongoDB 4.2 and later “4.2” or “4.0” Hex-encoded string (v1)
          MongoDB 4.0.7 and later “4.0” or “3.6” Hex-encoded string (v1)
          MongoDB 4.0.6 and earlier “4.0” Hex-encoded string (v0)
          MongoDB 4.0.6 and earlier “3.6” BinData
          MongoDB 3.6 “3.6” BinData

          With hex-encoded string resume tokens, you can compare and sort the resume tokens.

          Regardless of the fcv value, a 4.0 deployment can use either BinData resume tokens or hex string resume tokens to resume a change stream. As such, a 4.0 deployment can use a resume token from a change stream opened on a collection from a 3.6 deployment.

          New resume token formats introduced in a MongoDB version cannot be consumed by earlier MongoDB versions.

          Tip

          Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event’s _id field.

          Use Cases

          Change streams can benefit architectures with reliant business systems, informing downstream systems once data changes are durable. For example, change streams can save time for developers when implementing Extract, Transform, and Load (ETL) services, cross-platform synchronization, collaboration functionality, and notification services.

          Access Control

          For deployments enforcing Authentication and authorization:

          • To open a change stream against specific collection, applications must have privileges that grant changeStream and find actions on the corresponding collection.

            { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
            
          • To open a change stream on a single databases, applications must have privileges that grant changeStream and find actions on all non-system collections in a database.

            { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
            
          • To open a change stream on an entire deployment, applications must have privileges that grant changeStream and find actions on all non-system collections for all databases in the deployment.

            { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
            

          Event Notification

          Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set. This ensures that notifications are triggered only by majority-committed changes that are durable in failure scenarios.

          For example, consider a 3-member replica set with a change stream cursor opened against the primary. If a client issues an insert operation, the change stream only notifies the application of the data change once that insert has persisted to a majority of data-bearing members.

          If an operation is associated with a transaction, the change event document includes the txnNumber and the lsid.

          Collation

          Starting in MongoDB 4.2, change streams use simple binary comparisons unless an explicit collation is provided. In earlier versions, change streams opened on a single collection (db.collection.watch()) would inherit that collection’s default collation.