Hi,
I am utilizing the MongoDb change stream (C# MongoDB.Driver v2.12.0) to track changes on a single collection.
In my experimental use case the collection stores information about execution of threads.
A thread has two properties:
- Status - RUNNING, BLOCKED or COMPLETED
- BlockedCount - number of blocking threads
During its execution, a thread can spawn children threads and be blocked until all of the children are not completed. Whenever a children thread completes its execution, it updates the database by decrementing the ‘BlockedCount’ of the parent. Once the ‘BlockedCount’ drops to 0, the parent thread should continue its execution.
What I have noticed is that the change events can be different even if the update operations are exactly the same.
What I mean by this is, if I have 1 parent thread and 3 children threads completing their execution, sometimes I would receive:
- 3 distinct update events for the parent thread:
“Status” : “BLOCKED”, “BlockedCount” : 2
“Status” : “BLOCKED”, “BlockedCount” : 1
“Status” : “BLOCKED”, “BlockedCount” : 0 - 3 identical update events for the parent thread:
“Status” : “BLOCKED”, “BlockedCount” : 0
“Status” : “BLOCKED”, “BlockedCount” : 0
“Status” : “BLOCKED”, “BlockedCount” : 0
Is this considered a normal behavior, or not?
And if it is, is there some kind of configuration that would prevent this?
Here is the code for subscribing to the change stream
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<T>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Insert ||
change.OperationType == ChangeStreamOperationType.Update ||
change.OperationType == ChangeStreamOperationType.Replace)
.AppendStage<ChangeStreamDocument<T>, ChangeStreamDocument<T>, ChangeStreamOutputWrapper<T>>("{ $project: { '_id': 1, 'fullDocument': 1, 'ns': 1, 'documentKey': 1 }}");
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};
using (var cursor = await coll.WatchAsync(pipeline, options, cancellationToken))
{
await cursor.ForEachAsync(async change =>
{
// await some handler routine
}, cancellationToken);
}