MongoDB.live, free & fully virtual. June 9th - 10th. Register Now

Optimization of pulling "latest record earlier than X" in a large (500 million record) collection using index

Say I have a 4 field collection:

sku (string)
warehouse (string)
record_time (ISODate)
qty (int)

I would like to query the data at a “snapshot” in time. For example, I may say “show me the qty of everything as of Jan 1, 2020”. The data is such that some combinations of sku + warehouse may not have any entries for days/months.

First, I see that mongo has a recommended practice here:
https://docs.mongodb.com/master/reference/operator/aggregation/group/#group-pipeline-optimization
(jira ticket https://jira.mongodb.org/browse/SERVER-9507)

So, I have something like the following index:

db.test.createIndex({
  sku: 1,
  warehouse: 1,
  record_time: -1,
});

So, to get a specific SKU + warehouse at certain time, I could run:

db.test.aggregate([
  {$match: {sku: 'a', warehouse: 'a', record_time: {$lte: ISODate('2020-01-01T00:00:00')}}},
  {$sort: {record_time: -1}},
  {$limit: 1}
])

If I want to get all distinct warehouse + SKU, then:

db.test.aggregate([
  {$match: {record_time: {$lte: ISODate('2020-01-01T00:00:00')}}},
  {$sort: {
    sku: 1,
    warehouse: 1,
    record_time: -1,
  }},
  {$group: {
    _id: {
      sku: '$sku',
      warehouse: '$warehouse',
    },
    qty: {'$first': '$qty'},
    last_record_time: {'$first': '$record_time'}
  }},
], {allowDiskUse: true});

Which would give me the data I want. However, this query runs VERY SLOWLY (10+ minutes on an m50 in atlas)

I can see one problem in that the $match is running off of “record_time”, which isn’t first order indexed, so it’s probably causing a large part of the slowdown. However, if I just remove the $match portion, the query takes just as long to run.

Based on my desired outcome, is there a different way to structure the data/indexes to allow for the query “give me the latest entry before ISODate X for every sku + warehouse combination” to be run in a reasonable timeframe?

Thank you for any advice.

I am using Mongo 4.2.3

Explain output from the aggregation

db.test.aggregate([
  {$sort: {
    sku: 1,
    warehouse: 1,
    record_time: -1,
  }},
  {$group: {
    _id: {
      sku: '$sku',
      warehouse: '$warehouse',
    },
    qty: {'$first': '$qty'},
    record_time: {'$first': '$record_time'}
  }},
], {allowDiskUse: true, explain: true});
{
	"stages" : [
		{
			"$cursor" : {
				"query" : {
					
				},
				"sort" : {
					"sku" : 1,
					"warehouse" : 1,
					"record_time" : -1
				},
				"fields" : {
					"qty" : 1,
					"record_time" : 1,
					"sku" : 1,
					"warehouse" : 1,
					"_id" : 0
				},
				"queryPlanner" : {
					"plannerVersion" : 1,
					"namespace" : "test.test",
					"indexFilterSet" : false,
					"parsedQuery" : {
						
					},
					"queryHash" : "E47CEE36",
					"planCacheKey" : "E47CEE36",
					"winningPlan" : {
						"stage" : "FETCH",
						"inputStage" : {
							"stage" : "IXSCAN",
							"keyPattern" : {
								"sku" : 1,
								"warehouse" : 1,
								"record_time" : -1
							},
							"indexName" : "sku_1_warehouse_1_record_time_-1",
							"isMultiKey" : false,
							"multiKeyPaths" : {
								"sku" : [ ],
								"warehouse" : [ ],
								"record_time" : [ ]
							},
							"isUnique" : false,
							"isSparse" : false,
							"isPartial" : false,
							"indexVersion" : 2,
							"direction" : "forward",
							"indexBounds" : {
								"sku" : [
									"[MinKey, MaxKey]"
								],
								"warehouse" : [
									"[MinKey, MaxKey]"
								],
								"record_time" : [
									"[MaxKey, MinKey]"
								]
							}
						}
					},
					"rejectedPlans" : [ ]
				}
			}
		},
		{
			"$group" : {
				"_id" : {
					"sku" : "$sku",
					"warehouse" : "$warehouse"
				},
				"qty" : {
					"$first" : "$qty"
				},
				"last_record_time" : {
					"$first" : "$record_time"
				}
			}
		}
	],
}

I tried adding qty to the index so that the query could utilize a “covered index”, but it had minimal impact.

{
	"stages" : [
		{
			"$cursor" : {
				"query" : {
					
				},
				"sort" : {
					"sku" : 1,
					"warehouse" : 1,
					"record_time" : -1
				},
				"fields" : {
					"qty" : 1,
					"record_time" : 1,
					"sku" : 1,
					"warehouse" : 1,
					"_id" : 0
				},
				"queryPlanner" : {
					"plannerVersion" : 1,
					"namespace" : "test.test",
					"indexFilterSet" : false,
					"parsedQuery" : {
						
					},
					"queryHash" : "28987361",
					"planCacheKey" : "28987361",
					"winningPlan" : {
						"stage" : "PROJECTION_COVERED",
						"transformBy" : {
							"qty" : 1,
							"record_time" : 1,
							"sku" : 1,
							"warehouse" : 1,
							"_id" : 0
						},
						"inputStage" : {
							"stage" : "IXSCAN",
							"keyPattern" : {
								"sku" : 1,
								"warehouse" : 1,
								"record_time" : -1,
								"qty" : 1
							},
							"indexName" : "sku_1_warehouse_1_record_time_-1_qty_1",
							"isMultiKey" : false,
							"multiKeyPaths" : {
								"sku" : [ ],
								"warehouse" : [ ],
								"record_time" : [ ],
								"qty" : [ ]
							},
							"isUnique" : false,
							"isSparse" : false,
							"isPartial" : false,
							"indexVersion" : 2,
							"direction" : "forward",
							"indexBounds" : {
								"sku" : [
									"[MinKey, MaxKey]"
								],
								"warehouse" : [
									"[MinKey, MaxKey]"
								],
								"record_time" : [
									"[MaxKey, MinKey]"
								],
								"qty" : [
									"[MinKey, MaxKey]"
								]
							}
						}
					},
					"rejectedPlans" : [ ]
				}
			}
		},
		{
			"$group" : {
				"_id" : {
					"sku" : "$sku",
					"warehouse" : "$warehouse"
				},
				"qty" : {
					"$first" : "$qty"
				},
				"record_time" : {
					"$first" : "$record_time"
				}
			}
		}
	]
}

If I want to get all distinct warehouse + SKU, then:

db.test.aggregate([
  { $match: { record_time: { $lte: ISODate('2020-01-01T00:00:00') } } },
...

With the available index { sku: 1, warehouse: 1, record_time: -1 } the aggregation query will not be able to apply it for the $match stage. Having the $match stage at the beginning of the pipeline is very good, but without an index it is a very slow query with all that data.

I think defining another index only on the record_time field is the right approach. I am sure there will be performance gain (how much improvement depends upon the data). I suggest you try this approach on a sample data set. Generate the query plans before and after creating the new index, and use the “executionStats” mode with the explain().

Reference: Compound Index prefixes

Yes, setting up an index on record_time will be a great help.

Then I don’t think you need to sort en sku and warehouse but only on record_time because what is important is to get values in reverse record_time values. For sku and warehouse, the $group will do the job.

So you may try :

db.test.createIndex({ record_time: 1 })

db.test.aggregate([ 
  {$match: {record_time: {$lte: ISODate('2020-01-01T00:00:00')}}}, 
  {$sort: { record_time: -1 }}, 
  {$group: { _id: { sku: '$sku', warehouse: '$warehouse', }, 
             qty: {'$first': '$qty'}, 
             last_record_time: {'$first': '$record_time'} }} ], 
{allowDiskUse: true});

@RemiJ @Prasad_Saya
Ya, I should have not even mentioned the $match stage you are talking about. If you re-read my post, you can see that the query slowness is not because of this match:

In the explain plans that follow, I removed the $match anyway.

Regardless, thanks for at least replying. I ended up building a process to recombine data in hourly snapshots, and “cross fill” records for datehours during which there was no delta, so that the query could just be run against a single hour without sorting to get the latest record.

I’d be interested to hear if anyone has a similar type of data set and use case (insert-only collection of events, and grouping by “last event per group id before time X”).