Query to get the latest value for each parent

We have IoT telemetry data coming through into 1 hour buckets (documents with many samples).
Example document:

{ 
    "_id" : ObjectId("5f5024314a74f35fc6fb37e1"), 
    "date" : ISODate("2020-09-02T23:00:00.000+0000"), 
    "device" : ObjectId("5dd7596761ced7001253aab3"), 
    "telemetry" : "total-active-power", 
    "max" : 17175070000.0, 
    "min" : 17174710000.0, 
    "nsamples" : NumberInt(4), 
    "samples" : [
        {
            "date" : ISODate("2020-09-02T23:00:07.194+0000"), 
            "data" : {
                "samples" : NumberInt(12), 
                "latest" : 17174710000.0
            }
        }, 
        {
            "date" : ISODate("2020-09-02T23:01:07.328+0000"), 
            "data" : {
                "samples" : NumberInt(12), 
                "latest" : 17174730000.0
            }
        }, 
        {
            "date" : ISODate("2020-09-02T23:02:07.500+0000"), 
            "data" : {
                "samples" : NumberInt(12), 
                "latest" : 17174760000.0
            }
        }, 
        {
            "date" : ISODate("2020-09-02T23:03:07.751+0000"), 
            "data" : {
                "samples" : NumberInt(12), 
                "latest" : 17174790000.0
            }
        }
    ], 
    "sum" : 68699160000.0
}

It is easy to create an aggregate query that finds the latest value for 1 device and 1 telemetry ObjectId, but it seems impossible to get the latest value for MANY devices and telemetry ObjectIds.

Can do: Latest value for device ObjectId(“5dd7596761ced7001253aab3”) and telemetry “total-active-power”
Can’t do: Latest value for device ObjectId array (many) and “total-active-power”

In SQL I would do it like this (timescaledb):

SELECT data.* FROM vehicles v
  INNER JOIN LATERAL (
    SELECT * FROM location l
      WHERE l.vehicle_id = v.vehicle_id
      ORDER BY time DESC LIMIT 1
  ) AS data
ON true
ORDER BY v.vehicle_id, data.time DESC;

See https://docs.timescale.com/latest/using-timescaledb/reading-data

How would I do this in a Mongo aggregation?

Thanks

Hi @Jeremy_Carter and welcome in the community :+1: !

I made a small collection with these 3 documents (I just incremented the “_id” and the “device” by 1 each time.

{
	"_id" : ObjectId("5f5024314a74f35fc6fb37e1"),
	"date" : ISODate("2020-09-02T23:00:00Z"),
	"device" : ObjectId("5dd7596761ced7001253aab3"),
	"telemetry" : "total-active-power",
	"max" : 17175070000,
	"min" : 17174710000,
	"nsamples" : 4,
	"samples" : [
		{
			"date" : ISODate("2020-09-02T23:00:07.194Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174710000
			}
		},
		{
			"date" : ISODate("2020-09-02T23:01:07.328Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174730000
			}
		},
		{
			"date" : ISODate("2020-09-02T23:02:07.500Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174760000
			}
		},
		{
			"date" : ISODate("2020-09-02T23:03:07.751Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174790000
			}
		}
	],
	"sum" : 68699160000
}
{
	"_id" : ObjectId("5f5024314a74f35fc6fb37e2"),
	"date" : ISODate("2020-09-02T23:00:00Z"),
	"device" : ObjectId("5dd7596761ced7001253aab4"),
	"telemetry" : "total-active-power",
	"max" : 17175070000,
	"min" : 17174710000,
	"nsamples" : 4,
	"samples" : [
		{
			"date" : ISODate("2020-09-02T23:00:07.194Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174710000
			}
		},
		{
			"date" : ISODate("2020-09-02T23:01:07.328Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174730000
			}
		},
		{
			"date" : ISODate("2020-09-02T23:02:07.500Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174760000
			}
		},
		{
			"date" : ISODate("2020-09-02T23:03:07.751Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174790000
			}
		}
	],
	"sum" : 68699160000
}
{
	"_id" : ObjectId("5f5024314a74f35fc6fb37e3"),
	"date" : ISODate("2020-09-02T23:00:00Z"),
	"device" : ObjectId("5dd7596761ced7001253aab5"),
	"telemetry" : "total-active-power",
	"max" : 17175070000,
	"min" : 17174710000,
	"nsamples" : 4,
	"samples" : [
		{
			"date" : ISODate("2020-09-02T23:00:07.194Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174710000
			}
		},
		{
			"date" : ISODate("2020-09-02T23:01:07.328Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174730000
			}
		},
		{
			"date" : ISODate("2020-09-02T23:02:07.500Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174760000
			}
		},
		{
			"date" : ISODate("2020-09-02T23:03:07.751Z"),
			"data" : {
				"samples" : 12,
				"latest" : 17174790000
			}
		}
	],
	"sum" : 68699160000
}

And I wrote this little aggregation:

[
  {
    '$match': {
      'telemetry': 'total-active-power', 
      'device': {
        '$in': [
          new ObjectId('5dd7596761ced7001253aab3'), new ObjectId('5dd7596761ced7001253aab4'), new ObjectId('5dd7596761ced7001253aab5')
        ]
      }
    }
  }, {
    '$unwind': '$samples'
  }, {
    '$sort': {
      'samples.date': -1
    }
  }, {
    '$group': {
      '_id': '$device', 
      'value': {
        '$first': '$samples'
      }
    }
  }
]

Which gives me this result:

{
	"_id" : ObjectId("5dd7596761ced7001253aab3"),
	"value" : {
		"date" : ISODate("2020-09-02T23:03:07.751Z"),
		"data" : {
			"samples" : 12,
			"latest" : 17174790000
		}
	}
}
{
	"_id" : ObjectId("5dd7596761ced7001253aab5"),
	"value" : {
		"date" : ISODate("2020-09-02T23:03:07.751Z"),
		"data" : {
			"samples" : 12,
			"latest" : 17174790000
		}
	}
}
{
	"_id" : ObjectId("5dd7596761ced7001253aab4"),
	"value" : {
		"date" : ISODate("2020-09-02T23:03:07.751Z"),
		"data" : {
			"samples" : 12,
			"latest" : 17174790000
		}
	}
}

I think it’s what you asked for but maybe I misunderstood your demand. Please let me know if that’s not it.

1 Like

Hi @MaBeuLux88. Thanks for having a look at this. Let me just add a bit more information to the mix.

The collection of telemetry buckets currently has 400,000 documents. Each bucket is for 1 hour worth of telemetry. This was modelled as per the Mongo IoT whitepaper.

The current query which has an input of 1 device and 1 telemetry key (eg total-active-power, temperature, etc). This query finds the top most bucket by date and unwinds the samples.

See this aggregation (find the latest value for 1 telemetry and 1 device only):

[
  {
    $match: {
      telemetry: 'total-apparent-power', 
      device: ObjectId("5dce189883c74c001239c602")
    }
  },
  { $sort: { date: -1 } },
  { $limit: 2 },
  {
	$unwind: {
	  path: '$samples'
	}
  },
  {
	$sort: {
	  'samples.date': -1
	}
  },
  {
	$limit: 1
  },
  {
	$project: {
	  _id: 0,
	  date: '$samples.date',
	  device: '$device',
	  telemetry: '$telemetry',
	  value: '$samples.data'
	}
  }
]

This query is very fast because the sort and limit are combined and of course the limit is just 1.

Now the problem is that what if we wanted to do this query for 500 devices without sending 500 aggregations to mongo and waiting for them all to return?

If I run your aggregation on our large collection I have to send allowDiskUse true, and it takes 1 second to get the latest for 5 devices. But if I send 5 x single aggregation it takes < 40ms total.

The issue with my pipeline is that it’s actually “unwining” all the samples for all the dates of device X, Y and Z of the telemetry T. You need to find a way to reduce the documents for each device.

If you know each devices have an entry in the last 24 hours, you could use this to filter down the list of docs in the first match with date $gt (NOW - 24h) to limit the number of documents in the pipeline at this stage.

If some device have been stopped or are no longer reporting data, the date might be out of the filter. To solve this, we can find the latest entry for each device + telemetry like this:

[
  {
    '$match': {
      'telemetry': 'total-active-power', 
      'device': {
        '$in': [
          new ObjectId('5dd7596761ced7001253aab3'), new ObjectId('5dd7596761ced7001253aab5')
        ]
      }
    }
  }, {
    '$sort': {
      'date': -1
    }
  }, {
    '$group': {
      '_id': '$device', 
      'doc': {
        '$first': '$$ROOT'
      }
    }
  }, {
    '$project': {
      'date': '$doc.date', 
      'device': '$doc.device', 
      'telemetry': 'doc.telemetry', 
      'max': '$doc.max', 
      'min': '$doc.min', 
      'nsamples': '$doc.nsamples', 
      'samples': {
        '$last': '$doc.samples'
      }, 
      'sum': '$doc.sum'
    }
  }
]

In this pipeline, I’m extracting ONLY the latest doc for each device with the $group stage. Then, you don’t really need to use $unwind at all, as you always want to retrieve the latest entry in each “sample” array anyway. So we can use the $last array operator to retrieve this value.

I think this pipeline is a lot more optimized and should not need the allowDiskUse option for just 500 devices.

This should perform really with an index {telemetry: 1, date: -1, device: 1} in this order as this should avoid the in-memory sort.

It would perform even better if you could had a filter on the date in the first $match stage to limit the number of documents in this stage.

To validate my pipeline, I used these 3 documents (one device has 2 entries with 2 different dates).

{"_id":{"$oid":"5f5024314a74f35fc6fb37e1"},"date":{"$date":"2020-09-02T23:00:00Z"},"device":{"$oid":"5dd7596761ced7001253aab3"},"telemetry":"total-active-power","max":1.717507E+10,"min":1.717471E+10,"nsamples":4.0,"samples":[{"date":{"$date":"2020-09-02T23:00:07.194Z"},"data":{"samples":12.0,"latest":1.717471E+10}},{"date":{"$date":"2020-09-02T23:01:07.328Z"},"data":{"samples":12.0,"latest":1.717473E+10}},{"date":{"$date":"2020-09-02T23:02:07.5Z"},"data":{"samples":12.0,"latest":1.717476E+10}},{"date":{"$date":"2020-09-02T23:03:07.751Z"},"data":{"samples":12.0,"latest":1.717479E+10}}],"sum":6.869916E+10}
{"_id":{"$oid":"5f5024314a74f35fc6fb37e2"},"date":{"$date":"2020-09-02T22:00:00Z"},"device":{"$oid":"5dd7596761ced7001253aab3"},"telemetry":"total-active-power","max":1.717507E+10,"min":1.717471E+10,"nsamples":4.0,"samples":[{"date":{"$date":"2020-09-02T22:00:07.194Z"},"data":{"samples":12.0,"latest":1.717471E+10}},{"date":{"$date":"2020-09-02T22:01:07.328Z"},"data":{"samples":12.0,"latest":1.717473E+10}},{"date":{"$date":"2020-09-02T22:02:07.5Z"},"data":{"samples":12.0,"latest":1.717476E+10}},{"date":{"$date":"2020-09-02T22:03:07.751Z"},"data":{"samples":12.0,"latest":1.717479E+10}}],"sum":6.869916E+10}
{"_id":{"$oid":"5f5024314a74f35fc6fb37e3"},"date":{"$date":"2020-09-02T23:00:00Z"},"device":{"$oid":"5dd7596761ced7001253aab5"},"telemetry":"total-active-power","max":1.717507E+10,"min":1.717471E+10,"nsamples":4.0,"samples":[{"date":{"$date":"2020-09-02T23:00:07.194Z"},"data":{"samples":12.0,"latest":1.717471E+10}},{"date":{"$date":"2020-09-02T23:01:07.328Z"},"data":{"samples":12.0,"latest":1.717473E+10}},{"date":{"$date":"2020-09-02T23:02:07.5Z"},"data":{"samples":12.0,"latest":1.717476E+10}},{"date":{"$date":"2020-09-02T23:03:07.751Z"},"data":{"samples":12.0,"latest":1.717479E+10}}],"sum":6.869916E+10}

I hope this solves your issue :+1: !

Cheers,
Maxime.

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.