Can Change Events be considered unique?

Hi,

I am utilizing the MongoDb change stream (C# MongoDB.Driver v2.12.0) to track changes on a single collection.
In my experimental use case the collection stores information about execution of threads.
A thread has two properties:

  • Status - RUNNING, BLOCKED or COMPLETED
  • BlockedCount - number of blocking threads

During its execution, a thread can spawn children threads and be blocked until all of the children are not completed. Whenever a children thread completes its execution, it updates the database by decrementing the ‘BlockedCount’ of the parent. Once the ‘BlockedCount’ drops to 0, the parent thread should continue its execution.

What I have noticed is that the change events can be different even if the update operations are exactly the same.
What I mean by this is, if I have 1 parent thread and 3 children threads completing their execution, sometimes I would receive:

  • 3 distinct update events for the parent thread:
    “Status” : “BLOCKED”, “BlockedCount” : 2
    “Status” : “BLOCKED”, “BlockedCount” : 1
    “Status” : “BLOCKED”, “BlockedCount” : 0
  • 3 identical update events for the parent thread:
    “Status” : “BLOCKED”, “BlockedCount” : 0
    “Status” : “BLOCKED”, “BlockedCount” : 0
    “Status” : “BLOCKED”, “BlockedCount” : 0

Is this considered a normal behavior, or not?
And if it is, is there some kind of configuration that would prevent this?

Here is the code for subscribing to the change stream

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<T>>()
                    .Match(change => change.OperationType == ChangeStreamOperationType.Insert ||
                                     change.OperationType == ChangeStreamOperationType.Update ||
                                     change.OperationType == ChangeStreamOperationType.Replace)
                    .AppendStage<ChangeStreamDocument<T>, ChangeStreamDocument<T>, ChangeStreamOutputWrapper<T>>("{ $project: { '_id': 1, 'fullDocument': 1, 'ns': 1, 'documentKey': 1 }}");

var options = new ChangeStreamOptions
{
    FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};

using (var cursor = await coll.WatchAsync(pipeline, options, cancellationToken))
{
    await cursor.ForEachAsync(async change =>
    {
        // await some handler routine
    }, cancellationToken);
}

@yo_adrienne @James_Kovacs

This seems to me a racing condition. At the end of the day events are just that and do not guarantee duration they are picked up or sequence they are processed. If there is a way to use semaphores to detect sequence of thread ends and then atomic update of db then it would solve the problem. Sorry, a bit rusty to help with the code.

1 Like

You are right @MaxOfLondon. There is a race condition in the second scenario, since the event carrying BlockedCount == 0 information should be handled only once.
The documentation states that:

The fullDocument document represents the most current majority-committed version of the updated document. The fullDocument document may vary from the document at the time of the update operation depending on the number of interleaving majority-committed operations that occur between the update operation and the document lookup.

Which basically means that I am responsible for taking care of race conditions :slight_smile:

I’m not sure there is a race condition. The changeStream event is the same when the update is the same. What may differ is the full document lookup - and that depends on how many changes have been applied to it since the update that triggered the change event. We advise to make sure and set any fields you must have at change notification event as either immutable (document key) or be part of the change/update. In other words, the only way you can know what the current blocked count was at the time of update is if you record it in a field that’s updated at the same time - which in some scenarios may be a more complex update than one you are performing right now, along with all the extra costs that come with it.

Now, if you are already setting BlockedCount during the update (as it seems like you might be) then it will reflect the correct value at the time of the update and if you are seeing something else then you may be reading the wrong field. Rather than fullDocument.BlockedCount you should be reading updatedFields but I see you are adding a $project stage which is removing everything except fullDocument . Why? Remove the $project and see how the document you get will have correct and timely BlockedCount (but not in the fullDocument field).

Asya

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.