Watch keynotes and sessions from MongoDB.live, our virtual developer conference.

session.abortTransaction does not rollback elements inserted after startTransaction

I was finally trying out mongodb transactions with a local single node replicaset (version 4.2.3 MacOS), however I was unable to rollback any modifications I made within that transaction. I am using the reactive java driver version 1.13.0.

As I was unable to reproduce expected behaviour, I have created a simple Spock test which would insert a document, abort the transaction and then try to find the just inserted document in a new session (just to make sure, however I am able to see identical results if I add breakpoints and use the CLI to verify)

class MongoSpecification extends Specification {

    MongoClient mongoClient;
    MongoCollection collection;

    def setup() {
        mongoClient = MongoClients.create();
        MongoDatabase db = mongoClient.getDatabase("test");
        collection = db.getCollection("test");

        AsyncConditions conditions = new AsyncConditions(1)

        // make sure to drop the db to start with a clean state
        Single.fromPublisher(db.drop()).subscribe({ s ->
            conditions.evaluate({
                s != null
            })
        }, { t -> t.printStackTrace()})
        conditions.await()
    }

    def 'test'() {
        when:
        AsyncConditions conditions = new AsyncConditions(1)
        Single.fromPublisher(mongoClient.startSession()).flatMap({session ->
            session.startTransaction();
            return Single.fromPublisher(collection.insertOne(new Document("_id", 1).append("msg", "test")))
                    .map({ s ->
                        System.out.println("aborting transaction");
                        session.abortTransaction();
                        return s;
                    })
        }).subscribe({ success ->
            System.out.println("insert result: " + success);
            conditions.evaluate({
                success != null
            })
        }, { t ->
            t.printStackTrace()
        })

        then:
        conditions.await();

        when:
        conditions = new AsyncConditions(1)

        Single.fromPublisher(mongoClient.startSession()).flatMap({session ->
            return Single.fromPublisher(collection.find(new Document("_id", 1)).first())
        }).subscribe({ document ->
            System.out.println("found document: " + document);
            conditions.evaluate({
                document != null
            })
        }, { t ->
            t.printStackTrace();
        })

        then:
        conditions.await()
    }
}

The output is:

aborting transaction
insert result: SUCCESS
found document: [_id:1, msg:test]

If I check session.hasActiveTransaction() before aborting the transaction, it returns true.

The mongo driver lists transaction support since version 1.9.0, so there must be something wrong how I try to handle transaction within the reactive context. Unfortunately I was unable to find any documentation regarding the use of the reactive driver together with transactions.

Does anyone have an idea what is going on here?

Hi @st-h,

I think I see the issue, session.abortTransaction() returns a publisher, so that must be subscribed to for the transaction to be aborted.

Note: With the reactive streams driver all publishers are Cold publishers, so nothing happens unless they are subscribed to and data is requested. In this case the abortTransaction command is never actually requested.

I hope that helps,

Ross

Hi @Ross_Lawley,

thanks a lot for your comment. I totally overlooked that abortTransaction would return a publisher after startTransaction just returns void. I have rewritten my code so that it would subscribe to the abortTransaction publisher. I also added a breakpoint within ClientSessionImpl::abortTransaction and it looks like the callback there is called. However, I still see the same results:

static Completable insert(MongoClient mongoClient, MongoCollection<Document> collection) {
    return Single.fromPublisher(mongoClient.startSession())
            .flatMapCompletable(clientSession -> {
                clientSession.startTransaction();
                return Single.fromPublisher(collection.insertOne(new Document("_id", 1).append("msg", "test")))
                        .flatMapCompletable(success -> Completable.fromPublisher(clientSession.abortTransaction()));
    });
}

static Single<Document> findWithSession(MongoClient mongoClient, MongoCollection<Document> collection) {
    return Single.fromPublisher(mongoClient.startSession()).flatMap(clientSession -> {
        clientSession.startTransaction();
        return Single.fromPublisher(collection.find(new Document("_id", 1)).first()).map(document -> {
            clientSession.close();
            return document;
        });
    });
}

this is the Spock test, calling the two methods

def 'test2' () {
    when: 'make sure the document is not present'
    MongoTest.findWithSession(mongoClient, collection).blockingGet()

    then:
    NoSuchElementException e1 = thrown()

    when:
    Throwable t = MongoTest.insert(mongoClient, collection).blockingGet()

    then:
    t == null

    when: 'insert transaction has been aborted, document still should not be present'
    Document document = MongoTest.findWithSession(mongoClient, collection).blockingGet()
    System.out.println("found document: " + document)

    then: 'test fails here, as no exception is thrown'
    NoSuchElementException e2 = thrown()
}

this is the full output including info statements form the mongo driver

2020-03-02 12:20:56.819 [INFO ] [main                      ] cluster - Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
2020-03-02 12:20:56.924 [INFO ] [cluster-ClusterId{value='5e5cec18ef235923e7964245', description='null'}-localhost:27017] connection - Opened connection [connectionId{localValue:1, serverValue:295}] to localhost:27017
2020-03-02 12:20:56.929 [INFO ] [cluster-ClusterId{value='5e5cec18ef235923e7964245', description='null'}-localhost:27017] cluster - Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 2, 3]}, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=3316268, setName='local', canonicalAddress=127.0.0.1:27017, hosts=[127.0.0.1:27017], passives=[], arbiters=[], primary='127.0.0.1:27017', tagSet=TagSet{[]}, electionId=7fffffff0000000000000001, setVersion=1, lastWriteDate=Mon Mar 02 12:20:49 CET 2020, lastUpdateTimeNanos=433887177766044}
2020-03-02 12:20:56.999 [INFO ] [Thread-4                  ] connection - Opened connection [connectionId{localValue:2, serverValue:296}] to localhost:27017
found document: [_id:1, msg:test]

Do you have any idea what I am still doing wrong?

On another note: It would be incredibly helpful for people like me, who just started to get into the reactive world as a means to get rid of the deprecated mongo driver, if there would be more examples on how to do things like making use of transactions within the docs of the reactive driver.

Hi @st-h,

Nearly there - you just need to add the session in the insertOne call, so that the session is used and then it should work as expected. eg:

collection.insertOne(clientSession, new Document("_id", 1).append("msg", "test"));

Regarding documentation, I agree and this is something we as a team are looking at improving in the future as well as potentially a friendlier API. The main issue is using raw Publishers would make the documentation extremely verbose and confusing. However, I can see using a library like Reactor or RxJava should really reduce the complexity of the code and that may be the way forward.

All the best,

Ross

Ah, awesome. I was just debugging through ClientSession and noticed that I am dealing with different versions of this when starting and aborting the transaction and when notifyMessageSent is called. I was just thinking that I probably need to tell the operation to use the session somehow, as that somehow got lost and it wasn’t clear to me how the operation would even know about the session (there are no guarantees that it would be called from the same thread etc.)

Thanks a lot for pointing that out. I probably still would have missed it as I would have been looking for builder style setter and not an optional first argument :rofl:

The following error was way easier to solve:
Cannot create namespace test.test in multi-document transaction.
which is causes by the statement to drop the collection in the setup of the test. replacing it with remove all elements for the collection and everything works as expected. Wohooo. That was a wild ride though :smiley:

Regarding documentation, I think something like the GridFS tour for reactive streams, which could make use of RxJava or Reactive or both would, on its own, be extremely helpful. Just to show some examples how to use stuff - and it shouldn’t cause much bloat to the documentation. I think the learning curve when switching to the reactive world is quite steep and often there is a lot of doubt involved if the reactive stuff on its own is applied correctly. More examples would be totally awesome here.

Thanks again, Ross.

1 Like

Ross, if possible at all, could you please add some information about why the session/transaction has been added as an optional first parameter to all methods? I just would like to understand, as this not only means to have each mongo api method twice, but this also continues in the consuming application (if an app mixes transactional and non transactional queries)

As far as I understand, opening a transaction/session is not necessary to read consistent data (so that the client never sees any uncommitted writes). Speaking of a simple web application, one might want to use no transaction/session for all the methods that would only retrieve data. Atomic writes would not need a transaction as well, so the only case would be methods that perform multi document updates/inserts. For reads, the only case that would require the session seems to be when one wants to read data, that has been written in the same session, but hasn’t been committed yet?

Now, when writing some sort of data access layer, most cases that use transactions somewhere would be dealing with writing methods that would be available to be used within a session/transaction and a similar method to be used without a session.

So, it’s either:

if (session) { 
  collection.find(session, ..... )
} else {
  collection.find(....)
}

or doubling those methods, to provide one that has a client session parameter and one that doesn’t (just like the mongo client does).

Long story short:
Why did you choose to not do something like:

collection.withSession(session).find(...)

I am mainly asking, because I feel like there is something fundamentally wrong with my understanding and there probably has been a very good reason for doing it this way.