HomeLearnHow-to

MongoDB Atlas Data Lake Tutorial: Federated Queries and $out to AWS S3

Published: Jun 09, 2020

  • Atlas
  • Data Lake
  • Python
  • ...

By Maxime Beugnet

Share

Last year at MongoDB World 2019, Eliot announced that MongoDB Atlas Data Lake was a new tool available in beta in the MongoDB Cloud Platform.

During this last year, MongoDB has been working closely with many customers to test this new tool and gathered much feedback to make it even better.

Today, after a year of refinement and improvement, MongoDB is proud to announce that MongoDB Atlas Data Lake is now generally available and can be used with confidence in your production environment.

In this tutorial, I will show you a new feature of MongoDB Atlas Data Lake called Federated Query that allows you to access your archived documents in S3 AND your documents in your MongoDB Atlas cluster with a SINGLE MQL query.

"MongoDB Atlas Data Lake Federated Queries"

This feature is really amazing because it allows you to have easy access to your archived data in S3 along with your "hot" data in your Atlas cluster. This feature could help you prevent your Atlas clusters from growing in size indefinitely and reduce your costs drastically. It also makes it easier to gain new insights by easily querying data residing in S3 and exposing it to your real-time app.

Finally, I will show you how to use the new version of the $out aggregation pipeline stage to write documents from a MongoDB Atlas cluster into an AWS S3 bucket.

#Prerequisistes

In order to follow along this tutorial, you need to:

If you did these actions correctly, you should have an M10 (or bigger) cluster running in your MongoDB Atlas project.

"MongoDB Atlas M10 cluster"

And your Data Lake page should look like this:

"MongoDB Atlas Data Lake setup"

You should also have an AWS S3 bucket linked to your Atlas Data Lake setup. Mine is called cold-data-mongodb in this tutorial.

A MongoDB Atlas M10 or bigger cluster is required here because MongoDB Atlas Data Lake uses X.509 certificates which are not supported on MongoDB Atlas shared tier at this time.

#We need some data

To illustrate how $out and federated queries work, I will use an overly simple dataset to keep things as easy as possible to understand.

Connect to your M10 cluster using the CONNECT button using the Mongo Shell and let's insert these 4 documents in the test database:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
db.orders.insertMany( [ { "_id": 1, "created": new Date("2020-05-30"), "items": [1, 3], "price": 20 }, { "_id": 2, "created": new Date("2020-05-31"), "items": [2, 3], "price": 25 }, { "_id": 3, "created": new Date("2020-06-01"), "items": [1, 3], "price": 20 }, { "_id": 4, "created": new Date("2020-06-02"), "items": [1, 2], "price": 15 }, ] )

This is the result you should see in your terminal:

"4 documents inserted in MongoDB Atlas M10 cluster"

#Archive Data to S3 with $out

Now that we have a "massive" collection of orders, we can consider archiving the oldest orders to an S3 bucket. Let's imagine that once a month is over, I can archive all the orders from the previous month. I will create one JSON file in S3 for all the orders created during the previous month.

Let's transfer these orders to S3 using the aggregation pipeline stage $out.

But first, we need to configure Atlas Data Lake correctly.

#Data Lake Configuration

The first thing we need to do is to make sure we can write to our S3 bucket and read the archived orders as well as the current orders in my M10 cluster.

This new feature in MongoDB Atlas Data Lake is called Federated Queries.

Now head to your Data Lake configuration:

"MongoDB Atlas Data Lake configuration"

And let's use the following configuration:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{ "databases": [ { "name": "test", "collections": [ { "name": "orders", "dataSources": [ { "path": "/{min(created) isodate}-{max(created) isodate}*", "storeName": "cold-data-mongodb" }, { "collection": "orders", "database": "test", "storeName": "BigCluster" } ] } ], "views": [] } ], "stores": [ { "provider": "s3", "bucket": "cold-data-mongodb", "delimiter": "/", "includeTags": false, "name": "cold-data-mongodb", "region": "eu-west-1" }, { "provider": "atlas", "clusterName": "BigCluster", "name": "BigCluster", "projectId": "5e78e83fc61ce37535921257" } ] }

In this configuration, you can see that we have configured:

  • an S3 store: this is my S3 bucket that I named "cold-data-mongodb",
  • an Atlas store: this is my M10 cluster,
  • a database test with a collection orders that contains the data from this S3 store AND the data from my collection test.orders from my M10 cluster.

Feel free to replace cold-data-mongodb with your own bucket name. The very same one that you used during the Atlas Data Lake setup.

You can find your MongoDB Atlas project ID in your project settings:

"MongoDB Atlas Project ID"

In the path, I also told Atlas Data Lake that the JSON filename contains the min and max created dates of the orders it contains. This is useful for performance purposes: Atlas Data Lake won't have to scan all the files if I'm looking for an order on a given date. You can read more about data partitioning in the Data Lake documentation.

#$out to S3

Let's now collect the URI we are going to use to connect to Atlas Data Lake.

Click on the connect button:

"MongoDB Atlas Data Lake connect button"

Click on "Connect your application" and collect your URI:

"MongoDB Atlas Data Lake URI"

Now let's use Python to execute our aggregation pipeline and archive the 2 orders from May 2020 in our S3 bucket.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from datetime import datetime from pymongo import MongoClient client = MongoClient('<YOUR_ATLAS_DATA_LAKE_URI>') db = client.get_database("test") coll = db.get_collection("orders") start_date = datetime(2020, 5, 1, 0, 0, 0) # May 1st end_date = datetime(2020, 6, 1, 0, 0, 0) # June 1st pipeline = [ { '$match': { 'created': { '$gte': start_date, '$lt': end_date } } }, { '$out': { 's3': { 'bucket': 'cold-data-mongodb', 'region': 'eu-west-1', 'filename': start_date.isoformat('T', 'milliseconds') + 'Z-' + end_date.isoformat('T', 'milliseconds') + 'Z', 'format': {'name': 'json', 'maxFileSize': "200MiB"} } } } ] coll.aggregate(pipeline) print('Archive created!')

To execute this code, make sure you have Python 3 and the dependencies:

1
2
3
4
5
6
7
$ python3 --version Python 3.8.3 $ pip3 install pymongo dnspython Requirement already satisfied: pymongo in /home/polux/.local/lib/python3.8/site-packages (3.10.1) Requirement already satisfied: dnspython in /home/polux/.local/lib/python3.8/site-packages (1.16.0) $ python3 archive.py Archive created!

And now we can confirm that our archive was created correctly in our S3 bucket:

"file in the S3 bucket"

#Finish the Work

Now that our orders are safe in S3, I can delete these 2 orders from my Atlas cluster. Let's use Python again but this time, we need to use the URI from our Atlas cluster. The Atlas Data Lake URI doesn't allow this kind of operation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from datetime import datetime from pymongo import MongoClient client = MongoClient('<YOUR-ATLAS-URI>') db = client.get_database("test") coll = db.get_collection("orders") start_date = datetime(2020, 5, 1, 0, 0, 0) # May 1st end_date = datetime(2020, 6, 1, 0, 0, 0) # June 1st query = { 'created': { '$gte': start_date, '$lt': end_date } } result = coll.delete_many(query) print('Deleted', result.deleted_count, 'orders.')

Let's run this code:

1
2
$ python3 remove.py Deleted 2 orders.

Now let's double check what we have in S3. Here is the content of the S3 file I downloaded:

1
2
{"_id":{"$numberDouble":"1.0"},"created":{"$date":{"$numberLong":"1590796800000"}},"items":[{"$numberDouble":"1.0"},{"$numberDouble":"3.0"}],"price":{"$numberDouble":"20.0"}} {"_id":{"$numberDouble":"2.0"},"created":{"$date":{"$numberLong":"1590883200000"}},"items":[{"$numberDouble":"2.0"},{"$numberDouble":"3.0"}],"price":{"$numberDouble":"25.0"}}

And here is what's left in my MongoDB Atlas cluster.

"Documents left in MongoDB Atlas cluster"

#Federated Queries

As mentioned above already, federated queries in MongoDB Atlas Data Lake allow me to retain easy access to 100% of my data. I actually already used this feature when I ran the aggregation pipeline with the $out stage.

Let's verify this one last time with Python:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pymongo import MongoClient client = MongoClient('<YOUR_ATLAS_DATA_LAKE_URI>') db = client.get_database("test") coll = db.get_collection("orders") print("All the docs from S3 + Atlas:") docs = coll.find() for d in docs: print(d) pipeline = [ { '$group': { '_id': None, 'total_price': { '$sum': '$price' } } }, { '$project': { '_id': 0 } } ] print('\nI can also run an aggregation.') print(coll.aggregate(pipeline).next())

Here is the output:

1
2
3
4
5
6
7
8
All the docs from S3 + Atlas: {'_id': 1.0, 'created': datetime.datetime(2020, 5, 30, 0, 0), 'items': [1.0, 3.0], 'price': 20.0} {'_id': 2.0, 'created': datetime.datetime(2020, 5, 31, 0, 0), 'items': [2.0, 3.0], 'price': 25.0} {'_id': 3.0, 'created': datetime.datetime(2020, 6, 1, 0, 0), 'items': [1.0, 3.0], 'price': 20.0} {'_id': 4.0, 'created': datetime.datetime(2020, 6, 2, 0, 0), 'items': [1.0, 2.0], 'price': 15.0} I can also run an aggregation: {'total_price': 80.0}

#Wrap Up

MongoDB Atlas Data Lake is now production-ready and generally available starting today.

If you have a lot of infrequently access data in your Atlas cluster but you still need to be able to query it and access it easily once you've archived it to S3, Atlas Data Lake and the new Federated Query feature will help you save tons of money. If you're looking for an automated way to archive your data from Atlas Clusters to fully-managed S3 storage, then check out our new Atlas Online Archive feature!

Storage on S3 is a lot cheaper than scaling up your MongoDB Atlas cluster because your cluster is full of cold data and needs more RAM & storage size to operate correctly.

All the python code is available in this Github Repository.

Please let me know on Twitter if you liked my blog post: @MBeugnet.

If you have questions, please head to our developer community website where the MongoDB engineers and the MongoDB community will give you a hand.

More from this series

MongoDB Atlas Data Lake
  • MongoDB Atlas Data Lake Setup
  • MongoDB Atlas Data Lake Tutorial: Federated Queries and $out to AWS S3
  • How to Archive Old Data to Cloud Object Storage with MongoDB Atlas Data Lake & Online Archive
MongoDB Icon
  • Developer Hub
  • Documentation
  • University
  • Community Forums

© MongoDB, Inc.