Change stream projection, getting last resume token

The project that I work on currently works by tailing the oplog. When the project starts, it queries the oplog and obtains the last entry in the oplog before we began a series of tasks and state changes. We record this oplog position at bootstrap as we want to make sure that when we transition to a later phase that we begin processing oplog entries without lose of events.

I’m currently looking into whether we can transition this to using change streams instead. What the code is not yet designed to support is creating a change stream earlier on in the process and reusing that stream in the later phase so what I would like to do is something similar to the oplog approach where I determine a check-point in the stream and restart the stream later using that check-point.

So is there a way to query a change stream and ask for the last/most recent resumeToken? I’d like to stash this resume token in our cache and then use this in the later phase to restart the change stream specifying that resume token as the startAfter argument.

Hi @Chris_Cranford, welcome!

Generally, as you watch a collection you cache the resume token. If there’s any interruptions to the watch process, you can restore the token from the cache. For example using MongoDB Java driver:

MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();
BsonDocument resumeToken = next.getResumeToken();

You can then resume from a token as below:

cursor = collection.watch().resumeAfter(resumeToken).iterator();

You could perhaps store the resumeToken caches with some metadata information that you could query later on.

See also Resume a Change Stream.

Regards,
Wan.

2 Likes

Hi @wan

Perhaps a bit more background here might help illustrate my issue.

The Debezium project operates in 2 phases, a snapshot and a streaming phase. In a situation where the user is creating a new connector and performing the snapshot and streaming operations, we first want to record some marker to indicate that anything prior will be included in the snapshot phase and that any changes done after that marker will be picked up by the streaming phase.

In the traditional oplog scenario, we could capture the last timestamp from the oplog before we begin the snapshot so we know where we need to start tailing the oplog from when streaming begins.

Now to be clear, I’m not talking about restarts of Debezium here as we already cache the resumeToken from the change stream so that its available during restarts, which fits to what you described.

The niche problem is more to do with the notion of a user creating a new connector and how that would work during the small window between snapshot and streaming phases. What I need here is to get this marker before I begin the snapshot so that I can control the point where the change stream should effectively start when streaming begins. In other words, I need to guarantee that whatever happens to the database while snapshotting is running is later captured during streaming.

It doesn’t sound like with how the API is written there is such a way to obtain this marker as we have traditionally done so by basically getting the last event in the oplog before we begin snapshotting.

If that is all true and correct, I think there is maybe only a single option and that is we would need to open the change stream where we would normally get the last event from the oplog and somehow provide this stream to the streaming phase rather than opening it up later like we’re doing.

It seems unfortunate there is no way to open a change stream and issue some type of projection or query to have it return the last event in the stream.

Hi @Chris_Cranford,

Thanks for providing more context.

If that’s your use case, you could utilise startAtOperationTime instead.

Available in MongoDB 4.0+, you can specify a startAtOperationTime to open the cursor at a particular point in time (timestamp). Just make sure the time range is within the oplog range. Using MongoDB Java driver (v3.12) as an example. :

ChangeStreamIterable cs = collection.watch();
// Unix timestamp in seconds, with increment 1
cs = cs.startAtOperationTime(new BsonTimestamp(1587009106, 1));
MongoCursor<ChangeStreamDocument> cursor = cs.iterator();

See also Change Event’ s clusterTime field.

Regards,
Wan.

2 Likes

Awesome, I think I understand. I’ll give this a shot next week and report back with how that works.

Hi @wan
I’m faced with this issue. But I’m using AWS DocumentDB and it only compatibility with MongoDB 3.6.

Hi @chu_quang, and welcome to the forum!

AWS DocumentDB API is an emulation of MongoDB which differs in features, compatibility, and implementation from an actual MongoDB deployment. AWS DocumentDB suggestion of API version support (eg 3.6) is referring to the wire protocol used rather than the full MongoDB feature set for that version.

For further questions on AWS DocumentDB I’d suggest to contact AWS.

If you want to use the latest MongoDB features and drivers without emulation I’d strongly recommend to use MongoDB Atlas.

Regards,
Wan.

2 Likes

Thank for your support. I will contact AWS.

Hi @Chris_Cranford were you able to resolve this issue/approach?

Taking a guess, is this for the Debezium mongodb connector for allowing to load an entire collection upon creation + then start with the CDC via change streams…?

Is this “snapshot.mode”: “initial” of Debezium connector for MongoDB :: Debezium Documentation?

Thanks for these comments. I found the blog very useful.

1 Like