Hi Vinllen,
Thank you for your questions, and for your interest in change streams!
Your four-point outline of how change streams operates is broadly accurate. There are a couple of additional stages which may or may not be present depending on whether (for instance) you provided a token at which to resume the stream, or whether you specified the {fullDocument: "updateLookup"}
option. Below, I have listed all the stages that $changeStream
expands to, and where they run in a sharded context.
The following stages run on the shards:
DocumentSourceOplogMatch: filters the oplog for all relevant events
DocumentSourceChangeStreamTransform: transforms oplog entries into equivalent change stream entries
DocumentSourceCheckInvalidate: determines whether the current event should invalidate the stream, e.g. a collection or database drop
DocumentSourceCheckShardResumability: if a resume point was specified, checks that the shard's oplog history goes back at least that far
The following stages run on mongoS:
DocumentSourceEnsureResumeTokenPresent: if a resume token was specified, this stage verifies that it appears in the resumed stream.
DocumentSourceCloseCursor: after an invalidate, ensures that the cursor is closed and cleaned up correctly.
DocumentSourceLookupChangePostImage: if {fullDocument: "updateLookup"} was specified, obtains and adds the full document into all change stream events of {operationType: "update"}
Question 1:
A change stream which is opened with no explicit startAtOperationTime
, resumeAfter
or startAfter
is a request to start the stream at the current time and to return all events which occur from that point on. This is why the mongoS
adds the current clusterTime
to the request it sends to the shards - so that all shards will begin returning results from the same moment in time. Bear in mind that the cluster-wide global logical clock works by attaching the current clusterTime
to every message sent between members of the cluster; so in your example, when mongoS1
sends the $changeStream
to shard1
, it will also send {clusterTime: 10:00}
along with the request. When shard1
receives the request and sees that its local clusterTime
is outdated, it will adopt the mongoS'
clusterTime
instead. Therefore, the next oplog write on shard1
after it receives the $changeStream
request will jump from 09:59 to 10:01 or later, and will be picked up by the stream.
To understand why mongoS
explicitly sets the startAtOperationTime
, consider what would happen in your example if we did not do so. If the mongoS
simply passed the request on to the shards, then shard1
would start reporting events at 09:59 and shard2
would start reporting at 10:01. But what if shard2
also had some events that occurred between 09:59 and 10:01, and which should therefore sort between the events from shard1
? We would never see those events, which would violate change streams’ guarantee that no events in the cluster-wide sorted stream will ever be omitted.
Similarly, it would be semantically incorrect for mongoS
to read the current most-recent optime of each shard and choose either the earliest or latest as its starting point. Say the most recent event on shardA
was at time T
, the most recent event on shardB
was at T+1
, and the mongoS'
current clusterTime
is T+2
. As discussed above, if the user opens a stream with no resume options then they are requesting a stream that returns everything from now on, i.e. everything that happens after T+2
. If we were to consult the shards and start at either T
or T+1
, then we would be violating this request; we would be starting the stream at a point in the past, which is not what the user asked for. The only way to start a stream at a point in the past is to explicitly supply one of the startAtOperationTime
, resumeAfter
, or startAfter
options.
Finally, the ticket SERVER-31767 is not relevant to this issue. SERVER-31767 concerns storage-layer changes necessary to facilitate global reads at a specific point-in-time, but change streams do not use this feature - they read sequentially from the oplog, a collection whose history is always present.
Question 2:
Yes, there is a “wait policy” for change streams on mongoS
; it cannot return an event from any shard until all other shards have caught up with that event. We implement this by tracking the minimum promised sort key from each shard. Each time we receive a response from a shard, the response includes a field called postBatchResumeToken
, which is a promise that the shard will never produce an event that sorts earlier than that key. Before we can return the next event to the user, we must ensure that its sort key is lower than or equal to the minimum promised sort key across all shards. The minimum promised sort key can advance even if no events are returned from a particular shard, so an inactive shard cannot indefinitely block events from other shards from being returned to the user.
Hope that helps! Please let me know if you have any further questions.
Best regards,
Bernard