Concurrency in MongoDB transactions

Hi,

I have a stock document.

{
    "_id":  "601022171517ee00d48ce6ad",
    "caseQuantity": 4,
    "unitQuantity": 395,
    "totalQuantity": 2000,
    "currentQuantity": 1995,
    "isClaimActive": "true",
    "claim": 32,
    "status": "Active",
    "purchaseInventoryId":"601022151517ee00d48ce6ac",
    "index": "1611670005862",
    "batchNo": 1,
    "unitPrice": 14.19,
    "casePrice": 255.75,
    "product": "5f8d9a6184c1d0005814ed61",
    "productName": "Red Cow - Red Cow 18g",
    "type": "5f8d931fcc42160023d770e2",
    "units": 400,
    "agency": "5f8d6f0acc42160023d770c4",
    "createdBy": "5f8d6f2dcc42160023d770c5",
    "__v": 0
}

Imagine that two users are updating the currentQuantity of the above document at the same time.

I have used the below code to run the stock update. (Stock.updateOne)

    const { records } = req.body;
    const agency: any = req.currentUser!.agency;
    let stockItemsNoUpdate: any[] = [];
    const promiseArray: Query<any>[] = [];
    let recordCounter = -1;
    let response = {};

    const session = await mongoose.startSession();
    session.startTransaction({
      readPreference: 'primary',
      readConcern: { level: 'local' },
      writeConcern: { w: 'majority' },
    });

    const existingLoadingSheet = await LoadingSheet.findById(
      req.params.id
    ).session(session);

    if (!existingLoadingSheet) {
      session.endSession();
      throw new NotFoundError('Loading Sheet Not Found');
    }

    if (existingLoadingSheet.get('agency')._id != agency) {
      session.endSession();
      throw new AccessRestrictedError();
    }

    const oldLoadingsheetStockValuesArray = [...existingLoadingSheet?.records];
    const oldLoadingsheetStockValuesObj = oldLoadingsheetStockValuesArray.reduce(
      function (result, item) {
        var key = item.stockId;
        result[key] = {
          loadingCaseCount: item.loadingCaseCount,
          loadingUnitCount: item.loadingUnitCount,
          loadingTotal: item.loadingTotal,
          stockId: item.stockId,
          product: item.product,
          index: item.index,
          batchNo: item.batchNo,
          type: item.type,
        };
        return result;
      },
      {}
    );

    try {
      existingLoadingSheet.set({ records, isUnloaded: false });
      await existingLoadingSheet.save({ session: session });

      for (const el of records) {
        recordCounter++;
        const oldLoadingTotal =
          oldLoadingsheetStockValuesObj[el.stockId] != null
            ? oldLoadingsheetStockValuesObj[el.stockId].loadingTotal
            : 0;
        const diff_qty = el.loadingTotal - oldLoadingTotal;
        if (diff_qty === 0) {
          stockItemsNoUpdate.push({
            recordIndex: recordCounter,
            stockId: el.stockId,
            nModified: 1,
          });
        }
        promiseArray.push(
          Stock.updateOne(
            {
              _id: mongoose.Types.ObjectId(el.stockId),
              agency,
            },
            [
              {
                $set: {
                  currentQuantity: {
                    $add: [
                      '$currentQuantity',
                      {
                        $switch: {
                          branches: [
                            {
                              case: { $gt: [diff_qty, 0] },
                              then: {
                                $cond: [
                                  { $gte: ['$currentQuantity', diff_qty] },
                                  -diff_qty,
                                  0,
                                ],
                              },
                            },
                            {
                              case: { $lt: [diff_qty, 0] },
                              then: { $abs: diff_qty },
                            },
                          ],
                          default: 0,
                        },
                      },
                    ],
                  },
                },
              },
              {
                $set: {
                  unitQuantity: {
                    $mod: ['$currentQuantity', el.units],
                  },
                },
              },
              {
                $set: {
                  caseQuantity: {
                    $floor: {
                      $divide: ['$currentQuantity', el.units],
                    },
                  },
                },
              },
            ],
            { session: session }
          )
        );
      }

      const promiseResults = await Promise.all(promiseArray);
      for (const el of stockItemsNoUpdate) {
        promiseResults[el.recordIndex]['nModified'] = 1;
      }

      recordCounter = -1;
      stockItemsNoUpdate = [];

      for (const result of promiseResults) {
        recordCounter++;
        if (result.nModified === 0) {
          stockItemsNoUpdate.push(records[recordCounter]);
        }
      }

      if (stockItemsNoUpdate.length > 0) {
        await session.abortTransaction();
        session.endSession();

        response = {
          status: 'updateFailed',
          data: {
            failed: stockItemsNoUpdate,
          },
        };

        return res.status(200).send(response);
      }

      await session.commitTransaction();
      session.endSession();
    } catch (error) {
      console.log(error);
      await session.abortTransaction();
      session.endSession();
      throw new Error(
        `Error occured while trying to update the loading sheet. ${error}`
      );
    }

    response = {
      status: 'success',
      data: {
        sheet: existingLoadingSheet,
      },
    };

    res.send(response);
  }

Above code works perfectly and gives me the expected output. This is because i am the only one who is using the document. But My concern is will this avoid any concurrency issues if multiple users start using the same document simultaneously??

Some help would be much appreciated as I am new to MongoDB??

Hi @Shanka_Somasiri,

I haven’t reviewed the code. But in general when multiple transactions try to update the same documents they might acquire a lock on the updated documents, if they can’t whitin thr configured limits (default 5ms) they will abort.

Additionally other transactions might fail on write conflicts if another in progress transactions modified documents they are about to update.

More details see here:
https://docs.mongodb.com/manual/core/transactions-production-consideration/#acquiring-locks

Thanks
Pavel

1 Like

Thanks for your input @Pavel_Duchovny.

I went through the link you posted. My only concern is i run below 3 statements.

1st statement is getting a loading sheet; // line #15

const existingLoadingSheet = await LoadingSheet.findById(
      req.params.id
    ).session(session);  

2nd statement updating the loading sheet // line #48

existingLoadingSheet.set({ records, isUnloaded: false });
await existingLoadingSheet.save({ session: session });

and 3rd statement being update the stock in the warehouse;

Stock.updateOne() statement; // line #65

Acording to the https://docs.mongodb.com/manual/core/transactions-production-consideration/#acquiring-locks ; i know that my 3rd statement will cater to update the same document by multiple users.

Will the findById() along with the transaction session as shown in statement 1, cater to multiple users??

I am asking this because let’s imagine;

I have users A and B.
A and B both are looking at the same loadingsheet document (eg: 111)
Before A executes statement 2, B reads from statement 1.
By the time A finishes the statement 2 the loading sheet has new values, but unfortunately B has old values and can carry out the transaction but with wrong data.

In the above case will it be a problem??

Cheers

@Shanka_Somasiri,

We have a specific section that explains how to read and lock a document for this scenario.

https://docs.mongodb.com/manual/core/transactions-production-consideration/#in-progress-transactions-and-stale-reads

The main idea is that when you load a sheet you will adda locking field which will result in a tx lock. Therefore, other transactions will lock or abort when reading. I suggest for a more smooth experience wait a small period and retry this abort again by reloading the user view.

Once the transaction is commited by user A user B will use a read commited read isolation and will only read the data after a successful commit or rollback.

Please let me know if you have any additional questions.

Best regards,
Pavel

1 Like

Hi @Pavel_Duchovny,

This was really helpful.

But still I am not sure how I can use findOneAndUpdate() and achieve findById() functionality shown by below steps in my original code. In the code I do check;

1). whether the loading sheet exists
2). whether the sheet is related to the user’s agency
3). convert the loading item records
4). update the loadingsheet document itself
5). update stock document

const existingLoadingSheet = await LoadingSheet.findById(
      req.params.id
    ).session(session); 

if (!existingLoadingSheet) {
  session.endSession();
  throw new NotFoundError('Loading Sheet Not Found');
} // 1

if (existingLoadingSheet.get('agency')._id != agency) {
  session.endSession();
  throw new AccessRestrictedError();
} // 2

const oldLoadingsheetStockValuesArray = [...existingLoadingSheet?.records];
const oldLoadingsheetStockValuesObj = oldLoadingsheetStockValuesArray.reduce(
  function (result, item) {
	var key = item.stockId;
	result[key] = {
	  loadingCaseCount: item.loadingCaseCount,
	  loadingUnitCount: item.loadingUnitCount,
	  loadingTotal: item.loadingTotal,
	  stockId: item.stockId,
	  product: item.product,
	  index: item.index,
	  batchNo: item.batchNo,
	  type: item.type,
	};
	return result;
  },
  {}
); // 3

existingLoadingSheet.set({ records, isUnloaded: false });
await existingLoadingSheet.save({ session: session }); // 4 
 
` // and run Stock.updateOne() with session` // 5

Won’t just by running the findById(), loadingsheet.save() and Stock.updateOne() in the same transaction (with session) lock all the loading sheet and stock document until the transaction is commited or aborted so that no other users can perform on those documents??

My knowledge is really low on this case and please bare with me.

Cheers

Hi @Shanka_Somasiri,

Not sure why you can’t use filters by id in findOneAndUpdate({ _id : ....},{$set : {active : "true"}).session(...)

Once the documents you query are modified whithin a transaction atomically other will not be able to use the documents with snapshot read isolation until transaction commit.

You can proceed with other writes and commit/rollback based on your logic.

Thanks
Pavel

1 Like

I think my concern was how I can check the below conditions and provide an error message back to the client while using findOneAndUpdate.

if (!existingLoadingSheet) {
  session.endSession();
  throw new NotFoundError('Loading Sheet Not Found');
} // 1

if (existingLoadingSheet.get('agency')._id != agency) {
  session.endSession();
  throw new AccessRestrictedError();
} // 2

const oldLoadingsheetStockValuesArray = [...existingLoadingSheet?.records];
const oldLoadingsheetStockValuesObj = oldLoadingsheetStockValuesArray.reduce(
  function (result, item) {
	var key = item.stockId;
	result[key] = {
	  loadingCaseCount: item.loadingCaseCount,
	  loadingUnitCount: item.loadingUnitCount,
	  loadingTotal: item.loadingTotal,
	  stockId: item.stockId,
	  product: item.product,
	  index: item.index,
	  batchNo: item.batchNo,
	  type: item.type,
	};
	return result;
  },
  {}
); // 3

The same way, findOneAndUpdate will return no result if the document searched does not exist.

1 Like

Hi @Pavel_Duchovny,

I just finished altering my code and it works perfectly. You are a life saver.
Thank you sooo much for your assistance.

Appreciate it loads. <3

Cheers

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.