Change Stream Watch Cursor

When I open the changeStream on a specific collection, I can see in my debug logs that a db command ‘getMore’ is issued every now and then, which will be querying the db across the lifetime of the cursor.
If this is the case, how are we subscribing to events then? Shouldnt Mongo be emitting events out, rather than the observer issuing ‘getMore’ ?
I am using mongo-scala-driver.

Thank you for your thoughts!

Hi @Atil_Pai,

MongoDB Change Streams is an abstraction of a TAILABLE_AWAIT cursor, with support for resumability. Conceptually it is equivalent to the tail Unix command with the “follow” mode. The client uses getMore command to retrieve batches of documents/results currently pointed to by the cursor. The cursor waits for a few seconds after returning a full result set so that it can capture and return additional data added during a query.

Regards,
Wan.

1 Like

Thanks for the reply Wan. I was initially concerned with the get more command because our tailable curosr implementation of the Oplog issues a lot of get more commands, but in case of Change Streams it is proportion to the amount of change cursors opened, which is great and helps reduce a lot of the load that we previously used to experience.

Hi @wan, I am thinking about my last comment more. Does it sound right to you? Is the Change Stream more performant than tailing the Oplog, especially in terms of the getMore command load added onto the DB?

Thanks.

Hi @Atil_Pai,

Without knowing more of the your code implementation, you’re probably utilising TAILABLE cursor. Which basically a cursor that is not closed when the last data is retrieved but are kept open, the cursor location marks the final document position. If more data is seen, iteration of the cursor will continue from the last document seen.

Change Stream utilises TAILABLE_AWAIT, which is a tailable cursor with an await option set. It’s a cursor that will wait for a few seconds after returning a full result set, so that it can capture and return additional data added during a query. Depending on the use case, this potentially could be more efficient in terms of data round trips. See also MongoDB Specifications: Change Streams for more information on MongoDB driver specs on Change Stream implementation.

In addition, Change Stream is more than just tailing the Oplog. Key benefits of Change Streams over tailing Oplog are:

  1. Utilise the built-in MongoDB Role-Based Access Control. Applications can only open change streams against collections they have read access to. Refined and specific authorisation.

  2. Provide a well defined API that are reliable. The change events output that are returned by change streams are well documented. Also, all of the official MongoDB drivers follow the same specifications when implementing change streams interface. While the entries in Oplog may change between MongoDB major versions.

  3. Change events that are returned as part of change streams are at least committed to the majority of the replica set. This means the change events that are sent to the client are durable. Applications don’t need to handle data rollback in the event of failover.

  4. Provide a total ordering of changes across shards by utilising a global logical clock. MongoDB guarantees the order of changes are preserved and change events can be safely interpreted in the order received. For example, a change stream cursor opened against a 3-shard sharded cluster returns change events respecting the total order of those changes across all three shards.

  5. Due to the ordering characteristic, change streams are also inherently resumable. The _id of change event output is a resume token. MongoDB official drivers automatically cache this resume token, and in the case of network transient error the driver will retry once. Additionally, applications can also resume manually by utilising parameter resume_after and start_after. See also Resume a Change Stream.

  6. Utilise MongoDB aggregation pipeline. Applications can modify the change events output. Currently there are five pipeline stages available to modify the event output. For example, change event outputs can be filtered out (server side) before being sent out using $match stage. See Modify Change Stream Output for more information.

I would recommend using Change Stream instead of writing a custom code to tail Oplog.

Regards,
Wan.

1 Like

@wan, our legacy app has implementation of the oplog.find DbQuery with Options Tailable, AwaitData, NoTimeout with OplogReplay. This is wrapped in a while loop with a 20min reset timeout on the cursor(repeat the while loop then, along with some helper code). Do you think this configuration is fine for an Oplog implementation? Also, an Oplog tail cursor with DbQuery options Tailable and AwaitData, can I say it’s efficiency is similar to the changestream then?

Great information here, thanks!

I just have one related question.

I’m watching 3 collections in a DB using the Java driver. Each of those collections has one document only, each of which have embedded documents. My client code looks like this:

ChangeStreamIterable<Document> changes =
          client.getDatabase(<DBNAME>)
              .watch(Collections.singletonList(
                      Aggregates.match(Filters.in("ns.coll", Arrays.asList(WATCHED_COLLECTIONS)))))
          .fullDocument(FullDocument.UPDATE_LOOKUP);

where the variable “WATCHED_COLLECTIONS” is an array of the 3 collection names that I want to watch.

Since I’ve used the “match” stage, this filtering should be happening at the server side right?
Despite that, in the mongo logs, I can see that ‘docsExamined’ is very high! Why would that be happening, since there’s only one document in each collection? Even if we count all the embedded documents it doesn’t come up to 11000 documents. Is it also examining all the other documents that were upserted in the window between 2 ‘getMore’ operations?

COMMAND [conn20161] command DBNAME.$cmd command: getMore { getMore: 1760441711222280319, collection: "$cmd.aggregate", $db: "DBNAME", $clusterTime: { clusterTime: Timestamp(1590477125, 7396), signature: { hash: BinData(0, 17B8B1B3ADE3FEFC381F56E9201694DC9509BC38), keyId: 6829683829607759874 } }, lsid: { id: UUID("f88e3593-bec6-47cc-a067-6042f36aa1a3") } } originatingCommand: { aggregate: 1, pipeline: [ { $changeStream: { fullDocument: "updateLookup" } }, { $match: { ns.coll: { $in: [ "COLLECTION1", "COLLECTION2", "COLLECTION3" ] } } } ], cursor: {}, $db: "DBNAME", $clusterTime: { clusterTime: Timestamp(1590160602, 2), signature: { hash: BinData(0, 39A22239ED8BA07ED1E8B710D4212AE8CDB52663), keyId: 6829683829607759874 } }, lsid: { id: UUID("f88e3593-bec6-47cc-a067-6042f36aa1a3") } } planSummary: COLLSCAN cursorid:1760441711222280319 keysExamined:0 **docsExamined:11890** numYields:7138 nreturned:0 reslen:305 locks:{ ReplicationStateTransition: { acquireCount: { w: 7141 } }, Global: { acquireCount: { r: 7141 } }, Database: { acquireCount: { r: 7141 } }, Mutex: { acquireCount: { r: 3 } }, oplog: { acquireCount: { r: 7141 } } } storage:{ data: { bytesRead: 14 } } protocol:op_msg 351ms