The dispatching and merging policy about change stream in MongoS

I’m very interested in the change stream, so I go through some documentation posts on the official website. But there is no document introduce the change stream inner details especially on sharding, so I read the source code in cluster_aggregate.cpp starts from runAggregate function in v4.0. However, I’m not quite understanding the dispatching and merging policy details. Hope to get help here.

Here come my basic understandings about change streams in sharding, please let me know if I’m wrong:

  1. Users send change stream command by collection/db/global watch, the driver will parse the watch to aggregate command with the first stage is $changestream.
  2. MongoS receives the command and dispatches the command to all shards no matter whether there is a corresponding db/collection on the shard.
  3. MongoD receives the modified aggregate command and then runs two steps: $match and transform. $match stage does filter the given oplog by oplog cursor; transform stage then converts the oplog to change stream event.
  4. MongoS receives the cursor responses and then runs the merging policy.

I’ve some questions about the dispatching and merging policy:

  1. Since v3.6, MongoDB uses the logical time as timestamp which keeps the causal consistency. So different MongoS may have a different timestamp. In the change stream, MongoS uses local logical time if no afterClusterTime options given, will this cause some data loss? For example, mongos1 timestamp is 10:00, mongos2 is 10:02, shard1 is 09:59, shard2 is 10:01, if users send change stream command to mongos1, then mongos1 will send the aggregate command to each shard begin with 10:00, and the shard1 oplog from 09:59~10:00 will lost. I also find a jira SERVER-31767. It means this problem has been solved since v4.1.1 by global point time, right?
  2. About the merging step, mongos will merge different cursors into one before returning back to the user. The merge policy is by the resume token which includes: clusterTime, documentKey(shard key), UUID. Is there any “wait policy” in mongos before sort the change stream events? For example, shard1 returns events with ts=05:10, will mongos returns it to users immediately? or it will wait for all oplogs from other shards older than 05:10 to be received before replying to the user? If not, the causal consistency of move chunk can not be guaranteed.

Hi Vinllen,

Thank you for your questions, and for your interest in change streams!

Your four-point outline of how change streams operates is broadly accurate. There are a couple of additional stages which may or may not be present depending on whether (for instance) you provided a token at which to resume the stream, or whether you specified the {fullDocument: "updateLookup"} option. Below, I have listed all the stages that $changeStream expands to, and where they run in a sharded context.

The following stages run on the shards:

DocumentSourceOplogMatch: filters the oplog for all relevant events
DocumentSourceChangeStreamTransform: transforms oplog entries into equivalent change stream entries
DocumentSourceCheckInvalidate: determines whether the current event should invalidate the stream, e.g. a collection or database drop
DocumentSourceCheckShardResumability: if a resume point was specified, checks that the shard's oplog history goes back at least that far

The following stages run on mongoS:

DocumentSourceEnsureResumeTokenPresent: if a resume token was specified, this stage verifies that it appears in the resumed stream.
DocumentSourceCloseCursor: after an invalidate, ensures that the cursor is closed and cleaned up correctly.
DocumentSourceLookupChangePostImage: if {fullDocument: "updateLookup"} was specified, obtains and adds the full document into all change stream events of {operationType: "update"}

Question 1:

A change stream which is opened with no explicit startAtOperationTime, resumeAfter or startAfter is a request to start the stream at the current time and to return all events which occur from that point on. This is why the mongoS adds the current clusterTime to the request it sends to the shards - so that all shards will begin returning results from the same moment in time. Bear in mind that the cluster-wide global logical clock works by attaching the current clusterTime to every message sent between members of the cluster; so in your example, when mongoS1 sends the $changeStream to shard1, it will also send {clusterTime: 10:00} along with the request. When shard1 receives the request and sees that its local clusterTime is outdated, it will adopt the mongoS' clusterTime instead. Therefore, the next oplog write on shard1 after it receives the $changeStream request will jump from 09:59 to 10:01 or later, and will be picked up by the stream.

To understand why mongoS explicitly sets the startAtOperationTime, consider what would happen in your example if we did not do so. If the mongoS simply passed the request on to the shards, then shard1 would start reporting events at 09:59 and shard2 would start reporting at 10:01. But what if shard2 also had some events that occurred between 09:59 and 10:01, and which should therefore sort between the events from shard1? We would never see those events, which would violate change streams’ guarantee that no events in the cluster-wide sorted stream will ever be omitted.

Similarly, it would be semantically incorrect for mongoS to read the current most-recent optime of each shard and choose either the earliest or latest as its starting point. Say the most recent event on shardA was at time T, the most recent event on shardB was at T+1, and the mongoS' current clusterTime is T+2. As discussed above, if the user opens a stream with no resume options then they are requesting a stream that returns everything from now on, i.e. everything that happens after T+2. If we were to consult the shards and start at either T or T+1, then we would be violating this request; we would be starting the stream at a point in the past, which is not what the user asked for. The only way to start a stream at a point in the past is to explicitly supply one of the startAtOperationTime, resumeAfter, or startAfter options.

Finally, the ticket SERVER-31767 is not relevant to this issue. SERVER-31767 concerns storage-layer changes necessary to facilitate global reads at a specific point-in-time, but change streams do not use this feature - they read sequentially from the oplog, a collection whose history is always present.

Question 2:

Yes, there is a “wait policy” for change streams on mongoS; it cannot return an event from any shard until all other shards have caught up with that event. We implement this by tracking the minimum promised sort key from each shard. Each time we receive a response from a shard, the response includes a field called postBatchResumeToken, which is a promise that the shard will never produce an event that sorts earlier than that key. Before we can return the next event to the user, we must ensure that its sort key is lower than or equal to the minimum promised sort key across all shards. The minimum promised sort key can advance even if no events are returned from a particular shard, so an inactive shard cannot indefinitely block events from other shards from being returned to the user.

Hope that helps! Please let me know if you have any further questions.

Best regards,
Bernard

3 Likes

Thank you so so much for your reply, this is very helpful for me to understand change stream. The change stream is a good feature!

2 Likes

This topic was automatically closed 24 hours after the last reply. New replies are no longer allowed.