Change Streams - Swift Driver + Vapor

Hi Everyone

I am trying to implement change streams in the Swift Driver using Vapor.

Unfortunately I am unable to work out how to implement it having looked at the Swift Changes Streams docs. I have placed my code in configure.swift but I am currently getting a crash with this error:

assertion failed: cursor or change stream wasn't closed before it went out of scope: file MongoSwift/CursorCommon.swift, line 247

Below is the code I have tried starting out with which causes the crash. I have tried using .hop(to: app.client.eventLoop) in various places but I have been unable to prevent my app from crashing on start up using change streams.

let testInventory = app.testingDatabase.collection("playerratings", withType: PlayerRating.self)
testInventory.watch().flatMap { stream -> EventLoopFuture<ChangeStream<ChangeStreamEvent<MongoCollection<PlayerRating>.CollectionType>>> in
    let resumeToken = stream.resumeToken
    return testInventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
}
.whenFailure { _ in }

Any idea what I am doing wrong?

Thanks

Hi @Piers_Ebdon!

Change streams must be explicitly closed (by calling the kill() method) before they go out of scope to avoid leaking resources. The assertion failure is happening in a deinit method which confirms the stream has already been killed and fails otherwise.

In the example above, since you are returning the second stream, I think the issue is likely the first stream you are creating. Once you are done using the stream, you should call the kill() method which will asynchronously clean up the resources and return an EventLoopFuture.

I realize this is not clear from the examples in our change streams guide so I’ve opened a Jira ticket so we remember to correct that.

Hi @kmahar

So after using a change stream, I then need to close it by calling kill(), I assume that the observer watching the collection is still active right?

In regards to the example I provided, I was told that I should make sure that I use a resume stream for when the connection closes and starts up again. Still trying to get my head around when best to call a second watch method with the resume token, I guess this would be best placed when an error occurs from what I am doing with the change stream, such as inserting a new document into another collection,.

Thanks again for getting back so quickly. I’ll have another attempt at this tomorrow and will provide an update hopefully with some working code.

Hey @kmahar

So this is my updated attempt to handle using a change stream. I am trying to use it to update a few rolling averages but so far I am just updating one.

   let testInventory = app.testingDatabase.collection("playerratings", withType: PlayerRating.self)
    _ = testInventory.watch(withEventType: PlayerRating.self).flatMap { stream -> EventLoopFuture<Void> in
        stream.next().map { playerRating in
            if let playerRating = playerRating {
                let matchPlayerFilter: BSONDocument = [
                    "matchID": .objectID(playerRating.matchID),
                    "playerID": .objectID(playerRating.playerID)
                ]
               
                app.testingDatabase.collection("matchplayers", withType: MatchPlayer.self).findOne(matchPlayerFilter)
                    .hop(to: app.client.eventLoop)
                    .map { matchPlayer -> EventLoopFuture<UpdateResult?> in
                            let matchPlayer = matchPlayer!
                            let newTotalRatings = matchPlayer.totalRatings + 1
                            let averageRating = matchPlayer.averageRating * Double(matchPlayer.totalRatings) + playerRating.rating / Double(newTotalRatings)
                            let updateMatchPlayer: BSONDocument = ["$set": ["totalRatings": .init(integerLiteral: newTotalRatings),
                                                                            "averageRating": .double(averageRating)]]
                            return app.testingDatabase.collection("matchplayers", withType: MatchPlayer.self).updateOne(filter: matchPlayerFilter, update: updateMatchPlayer)
                    }
            }
        }
        return stream.kill()
    }

I’m not entirely sure how to handle any potential error cases and have used a force unwrap which I would never do in production code.

Secondly, I haven’t used a resume token which I was told I should do but I don’t know where / when to use them.

Also, another grey area is whether the way I handle the change stream would ensure that the integrity of the data is maintained, as I’m thinking that whilst one call is finishing. another call (change stream) might come in before the previous one has finished.

Hello!

Just to make sure I understand what you are trying to do correctly: you are watching one collection for changes (player ratings) and on certain types of events (perhaps whenever a new document is inserted to the PlayerRating collection?) you’d like to update a corresponding collection MatchPlayers which is tracking aggregated data. is that correct?

Based on what you’ve said you may want to consider using a MongoDB view instead of change streams here. This allows you to create something that behaves very similarly to a collection but has results calculated on-demand via applying an aggregation pipeline to another collection. This would enable you to only store playerRatings and not also matchRatings.

This has the downside that you always have to compute the results when you need them, but on the other hand saves you from maintaining multiple collections and recomputing every time a new player rating is inserted.

E.g. applying the following pipeline to your playerRatings collection generates the same data as that you store in matchRatings:

// Group together documents in playerratings by player and match ID and calculate averages.
let groupStage: BSONDocument = [
   "$group": [
       // group together documents where the player and match ID values match.
		"_id": ["playerID": "$playerID", "matchID": "$matchID"],
        // add a totalRatings field that adds 1 value for each matching doc.
		"totalRatings": ["$sum": 1],
        // add an average field that averages the "rating" field for each doc.
		"averageRating": ["$avg": "$rating"]
	]
]

// Restructure the output of the previous stage to look like a MatchPlayer.
let projectStage: BSONDocument = [
	"$project": [
        // these IDs are nested under _id, flatten out the structure.
		"playerID": "$_id.playerID",
        "matchID": "$_id.matchID",
        // we already projected both fields so don't need to include this too (0 means omit it).
        "_id": 0,
        // we want to pass through these two fields as-is, so use 1.
        "totalRatings": 1,
        "averageRating": 1
	]
]

You could create such a view by doing the following:

var collectionOptions = CreateCollectionOptions()
collectionOptions.viewOn = "playerratings"
collectionOptions.pipeline = [groupStage, projectStage]

db.createCollection(
	"matchplayers",
	options: collectionOptions,
	withType: MatchPlayer.self
).flatMap { view in
	// ... use view like a normal collection (read-only)
}

I think that is likely the simplest way to accomplish what you are doing here.

If that doesn’t work for your use case for whatever reason, just to clear up some things about change streams:

The way you’ve written this now only processes a single change. I’m not sure how you are calling this code, but typically a change stream is something you have open and running for a long period of time, and the stream will collect any changes that have occurred since it was opened, or in the case of a resume token, since that resume token. This approach appears to possibly create a new change stream each time you need it. You can register a callback for a change stream via ChangeStream.forEach that will execute on each event in the stream as it arrives and then just keep the stream open long term to let it handle processing events until it is killed.

No, once you kill the change stream no observer exists anymore. If you need to keep the stream around long-term you should store it somewhere and not close it until you are finished using it.

The purpose of the resume token is to allow you to create a new change stream that “resumes” wherever a previous change stream left off. Note that the driver will in many cases automatically resume a change stream for you upon e.g. a network error.

If it’s possible that the code here is being called concurrently (or if you were to switch to using forEach), then yes, you could run into issues where the callbacks chained onto the change stream event could be executing at the same time and lead to issues due from lack of synchronization.

A couple more notes about your code sample in particular:

  • The EventType you pass in when creating a change stream needs to match up with the structure of documents emitted from the ChangeStream. The default structure of those documents matches up with the ChangeStreamEvent type and would not directly match the PlayerRatings type. This helper is something we provide for the case where users supply a custom pipeline to the watch method which transforms the output documents’ structure. In cases where you are not transforming the output documents at all you probably don’t need to use the withEventType parameter at all.
  • Your code currently processes all events emitted from a change stream. Note that events are emitted for updates, deletions, insertions, etc. which I think you are likely not interested in all of: https://docs.mongodb.com/manual/reference/change-events/#change-stream-output
1 Like

Hi @kmahar

I had no idea about MongoDB views and how they work. They sound cool. Perhaps I should try and expand a bit on what I am doing to provide more context about the problem I was attempting to solve with change streams.

I am creating an app that connects certain football (soccer) YouTube channels with their subscribers. The first feature I am looking to provide is to allow the subscribers to rate the players in their team after a match has been played.

When a subscriber (fan) submits their rating for a player, it will be used to calculate 3 rolling averages:

  1. The average for that player in a match, to be used by the channel and calculated from all their subscriber ratings

  2. The long term average for each player, to be used by the channel and calculated from all the ratings for that player, from all their subscribers, for every match that player has played.

  3. The long term average of that player, to be used by the subscriber, from all the ratings that the subscriber has created for that player.

The predicament I have is how to update these averages, particularly for the channel, without blocking anything, whilst ensuring the data maintains it’s integrity and ideally a solution whose performance can scale regardless of whether a channel had 10 subscribers or a million subscribers.

I was told change streams could be a solution for this but I have found them rather tricky to get my head around and implement as demonstrated by my attempts above. I was only looking to react to when a player rating was inserted. having to worry about closing the connection, running the code from configure.swift (Vapor project) and not from an api call, somehow handling error etc adds to the complexity for me.

views sound good but I wonder whether they would work when a rolling average is needed to be updated in 3 different places when a player rating is inserted?

I was thinking that a simple potential solution could be to use the $inc operator to increase the total number of ratings and the total rating value in each related document and then calculate the averages client side? I was thinking that using the $inc operator would have a minimal impact in blocking a document from being written or read to? However this solution doesn’t seem optimal but may be best considering my limited MongoDB skills :joy:

Apologies for the long reply and hopefully I have made some sense. Please let me know If I haven’t, which is most likely!!

I keep sounding like a broken record but I really appreciate all the help and feedback. I usually don’t like asking for help but your colleagues and yourself make the experience of doing so a breeze!