Learn, develop, and innovate from anywhere. Join us for our MongoDB .Live series.
HomeLearnHow-to

Using MongoDB Realm WebHooks with Amazon Kinesis Data Firehose

Published: Jul 29, 2020

  • MongoDB
  • Atlas
  • Realm
  • ...

By Aaron Bassett

Share

With MongoDB Realm's AWS integration, it has always been as simple as possible to use MongoDB as a Kinesis data stream. Now with the launch of 3rd party data destinations in Kinesis, you can also use MongoDB Realm and MongoDB Atlas as a AWS Kinesis Data Firehose destination.

Keep in mind that this is just an example. You do not need to use Atlas as both the source and destination for your Kinesis streams. I am only doing so in this example to demonstrate how you can use MongoDB Atlas as both an AWS Kinesis Data and Delivery Stream. But, in actuality, you can use any source for your data that AWS Kinesis supports, and still use MongoDB Atlas as the destination.

#Prerequisites

Before we get started, you will need the following:

#Setting up our Kinesis Data Stream

RaspberryPi 3 with a Sense HAT

In this example, the source of my data is a Raspberry Pi with a Sense HAT. The output from the Sense HAT is read by a Python script running on the Pi. This script then stores the sensor data such as temperature, humidity, and pressure in MongoDB Atlas.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import platform import time from datetime import datetime from pymongo import MongoClient from sense_hat import SenseHat # Setup the Sense HAT module and connection to MongoDB Atlas sense = SenseHat() client = MongoClient(process.env.MONGODB_CONNECTION_STRING) db = client.monitors sense.load_image("img/realm-sensehat.png") # If the acceleration breaches 1G we assume the device is being moved def is_moving(x, y, z): for acceleration in [x, y, z]: if acceleration < -1 or acceleration > 1: return True return False while True: # prepare the object to save as a document in Atlas log = { "nodeName": platform.node(), "humidity": sense.get_humidity(), "temperature": sense.get_temperature(), "pressure": sense.get_pressure(), "isMoving": is_moving(**sense.get_accelerometer_raw()), "acceleration": sense.get_accelerometer_raw(), "recordedAt": datetime.now(), } # Write the report object to MongoDB Atlas report = db.reports.insert_one(log) # Pause for 0.5 seconds before capturing next round of sensor data time.sleep(0.5)

I then use a Realm Database Trigger to transform this data into a Kinesis Data Stream.

Realm functions are useful if you need to transform or do some other computation with the data before putting the record into Kinesis. However, if you do not need to do any additional computation, it is even easier with the AWS Eventbridge. MongoDB offers an AWS Eventbridge partner event source that lets you send Realm Trigger events to an event bus instead of calling a Realm Function. You can configure any Realm Trigger to send events to EventBridge. You can find out more in the documentation: "Send Trigger Events to AWS EventBridge"

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Function is triggered anytime a document is inserted/updated in our collection exports = function (event) { // Access the AWS service in Realm const awsService = context.services.get("AWSKinesis") try { awsService .kinesis() .PutRecord({ /* this trigger function will receive the full document that triggered the event put this document into Kinesis */ Data: JSON.stringify(event.fullDocument), StreamName: "realm", PartitionKey: "1", }) .then(function (response) { return response }) } catch (error) { console.log(JSON.parse(error)) } }

You can find out more details on how to do this in our blog post "Integrating MongoDB and Amazon Kinesis for Intelligent, Durable Streams."

#Amazon Kinesis Data Firehose Payloads

AWS Kinesis HTTP(s) Endpoint Delivery Requests are sent via POST with a single JSON document as the request body. Delivery destination URLs must be HTTPS.

#Delivery Stream Request Headers

Each Delivery Stream Request contains essential information in the HTTP headers, some of which we'll use in our Realm WebHook in a moment.

  • X-Amz-Firehose-Protocol-Version: This header indicates the version of the request/response formats. Currently, the only version is 1.0, but new ones may be added in the future
  • X-Amz-Firehose-Request-Id: This value of this header is an opaque GUID used for debugging purposes. Endpoint implementations should log the value of this header if possible, for both successful and unsuccessful requests. The request ID is kept the same between multiple attempts of the same request
  • X-Amz-Firehose-Source-Arn: The ARN of the Firehose Delivery Stream represented in ASCII string format. The ARN encodes region, AWS account id, and the stream name
  • X-Amz-Firehose-Access-Key: This header carries an API key or other credentials. This value is set when we create or update the delivery stream. We'll discuss it in more detail later

#Delivery Stream Request Body

The body carries a single JSON document, you can configure the max body size, but it has an upper limit of 64 MiB, before compression. The JSON document has the following properties:

  • requestId: Same as the value in the X-Amz-Firehose-Request-Id header, duplicated here for convenience
  • timestamp: The timestamp (milliseconds since epoch) at which the Firehose server generated this request
  • records: The actual records of the Delivery Stream, carrying your data. This is an array of objects, each with a single property of data. This property is a base64 encoded string of your data. Each request can contain a minimum of 1 record and a maximum of 10,000. It's worth noting that a record can be empty

#Response Format

When responding to a Delivery Stream Request, there are a few things you should be aware of

#Status Codes

The HTTP status code must be in the 2xx, 4xx, 5xx range; they will not follow redirects, so nothing in the 3xx range. Only a status of 200 is considered a successful delivery of the records; all other statuses are regarded as a retriable error, except 413.

413 (size exceeded) is considered a permanent failure, and will not be retried. In all other error cases, they will reattempt delivery of the same batch of records using an exponential back-off algorithm.

The retries are backed off using an initial back-off time of 1 second with a jitter factor of 15% . Each subsequent retry is backed off using the formula initial-backoff-time * (multiplier(2) ^ retry_count) with added jitter. The back-off time is capped by a maximum interval of 2 minutes. For example on the 'n'-th retry the back-off time is = MAX(120sec, (1 * (2^n)) * random(0.85, 1.15).

These parameters are subject to change. Please refer to the AWS Firehose documentation for exact initial back-off time, max back-off time, multiplier, and jitter percentages.

#Other Response Headers

As well as the HTTP status code your response should include the following headers:

  • Content-Type: The only acceptable content type is application/json
  • Content-Length: The Content-Length header must be present if the response has a body

Do not send a Content-Encoding header, the body must be uncompressed.

#Response Body

Just like the Request, the Response body is JSON, but it has a max filesize of 1MiB. This JSON body has two required properties:

  • requestId: This must match the requestId in the Delivery Stream Request
  • timestamp: The timestamp (milliseconds since epoch) at which the server processed this request

If there was a problem processing the request, you could optionally include an errorMessage property. If a request fails after exhausting all retries, the last Instance of this error message is copied to the error output S3 bucket, if one has been configured for the Delivery Stream.

#Storing Shared Secrets

When we configure our Kinesis Delivery Stream, we will have the opportunity to set an AccessKey value. This is the same value which is sent with each request as the X-Amz-Firehose-Access-Key header. We will use this shared secret to validate the source of the request.

We shouldn't hard-code this access key in our Realm function; instead, we will create a new secret named FIREHOSE_ACCESS_KEY. It can be any value, but keep a note of it as you'll need to reference it later when we configure the Kinesis Delivery Stream.

Screenshot of Realm Secrets Screenshot

#Creating our Realm WebHook

Before we can write the code for our WebHook, we first need to configure it. The "Configure Service WebHooks guide in the Realm documentation goes into more detail, but you will need to configure the following options:

  • Authentication type must be set to system
  • The HTTP method is POST
  • "Respond with result" is disabled
  • Request validation must be set to "No Additional Authorisation"; we need to handle authenticating Requests ourselves using the X-Amz-Firehose-Access-Key header
Screenshot of Realm function settings

#The Realm Function

For our WebHook we need to write a function which:

  • Receives a POST request from Kinesis
  • Ensures that the X-Amz-Firehose-Access-Key header value matches the FIREHOSE_ACCESS_KEY secret
  • Parses the JSON body from the request
  • Iterates over the reports array and base64 decodes the data in each
  • Parses the base64 decoded JSON string into a JavaScript object
  • Writes the object to MongoDB Atlas as a new document
  • Returns the correct status code and JSON body to Kinesis in the response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
exports = function(payload, response) { /* Using Buffer in Realm causes a severe performance hit this function is ~6 times faster */ const decodeBase64 = (s) => { var e={},i,b=0,c,x,l=0,a,r='',w=String.fromCharCode,L=s.length var A="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" for(i=0;i<64;i++){e[A.charAt(i)]=i} for(x=0;x<L;x++){ c=e[s.charAt(x)];b=(b<<6)+c;l+=6 while(l>=8){((a=(b>>>(l-=8))&0xff)||(x<(L-2)))&&(r+=w(a))} } return r } // Get AccessKey from Request Headers const firehoseAccessKey = payload.headers["X-Amz-Firehose-Access-Key"] // Check shared secret is the same to validate Request source if(firehoseAccessKey == context.values.get("FIREHOSE_ACCESS_KEY")) { // Payload body is a JSON string, convert into a JavaScript Object const data = JSON.parse(payload.body.text()) // Each record is a Base64 encoded JSON string const documents = data.records.map((record) => { const document = JSON.parse(decodeBase64(record.data)) return { ...document, _id: new BSON.ObjectId(document._id) } }) // Perform operations as a bulk const bulkOp = context.services.get("mongodb-atlas").db("monitors").collection("firehose").initializeOrderedBulkOp() documents.forEach((document) => { bulkOp.find({ _id:document._id }).upsert().updateOne(document) }) response.addHeader( "Content-Type", "application/json" ) bulkOp.execute().then(() => { // All operations completed successfully response.setStatusCode(200) response.setBody(JSON.stringify({ requestId: payload.headers['X-Amz-Firehose-Request-Id'][0], timestamp: (new Date()).getTime() })) return }).catch((error) => { // Catch any error with execution and return a 500 response.setStatusCode(500) response.setBody(JSON.stringify({ requestId: payload.headers['X-Amz-Firehose-Request-Id'][0], timestamp: (new Date()).getTime(), errorMessage: error })) return }) } else { // Validation error with Access Key response.setStatusCode(401) response.setBody(JSON.stringify({ requestId: payload.headers['X-Amz-Firehose-Request-Id'][0], timestamp: (new Date()).getTime(), errorMessage: "Invalid X-Amz-Firehose-Access-Key" })) return } }

As you can see, Realm functions are mostly just vanilla JavaScript. We export a function which takes the request and response as arguments and returns the modified response.

One extra we do have within Realm functions is the global context object. This provides access to other Realm functions, values, and services; you may have noticed in the trigger function at the start of this article that we use the context object to access our AWS service. Whereas in the code above we're using the context object to access the mongodb-atlas service and to retrieve our secret value. You can read more about what's available in the Realm context in our documentation.

#Decoding and Parsing the Payload Body

1
2
3
4
5
6
7
8
9
10
11
// Payload body is a JSON string, convert into a JavaScript Object const data = JSON.parse(payload.body.text()) // Each record is a Base64 encoded JSON string const documents = data.records.map((record) => { const document = JSON.parse(decodeBase64(record.data)) return { ...document, _id: new BSON.ObjectId(document._id) } })

When we receive the POST request, we first have to convert the body—which is a JSON string—into a JavaScript object. Then we can iterate over each of the records.

The data in each of these records is Base64 encoded, so we have to decode it first.

Using Buffer() within Realm functions may currently cause a degradation in performance. Currently we do not recommend using Buffer to decode Base64 strings, but instead to use a function such as decodeBase64() in the example above.

This data could be anything, whatever you've supplied in your Delivery Stream, but in this example, it is the MongoDB document sent from our Realm trigger. This document is also a JSON string, so we'll need to parse it back into a JavaScript object.

#Writing the Reports to MongoDB Atlas

Once the parsing and decoding are complete, we're left with an array of between 1 and 10,000 objects, depending on the size of the batch. It's tempting to pass this array to insertMany(), but there is the possibility that some records might already exist as documents in our collection.

Remember if Kinesis does not receive an HTTP status of 200 in response to a request it will, in the majority of cases, retry the batch. We have to take into account that there could be an issue after the documents have been written that prevents Kinesis from receiving the 200 OK status. If this occurs and we try to insert the document again, MongoDB will raise a Duplicate key error exception.

To prevent this we perform a find() and updateOne(), with upsert().

When updating/inserting a single document, you can use updateOne() with the upsert option.

1
2
3
4
5
context.services.get("mongodb-atlas").db("monitors").collection("firehose").updateOne( {_id: document._id}, document, {upsert: true} )

But we could potentially have to update/insert 10,000 records, so instead, we perform a bulk write.

1
2
3
4
5
// Perform operations as a bulk const bulkOp = context.services.get("mongodb-atlas").db("monitors").collection("firehose").initializeOrderedBulkOp() documents.forEach((document) => { bulkOp.find({ _id:document._id }).upsert().updateOne(document) })

#Sending the Response

1
2
3
4
5
6
7
8
9
bulkOp.execute().then(() => { // All operations completed successfully response.setStatusCode(200) response.setBody(JSON.stringify({ requestId: payload.headers['X-Amz-Firehose-Request-Id'][0], timestamp: (new Date()).getTime() })) return })

If our write operations have completed successfully, we return an HTTP 200 status code with our response. Otherwise, we return a 500 and include the error message from the exception in the response body.

1
2
3
4
5
6
7
8
9
10
).catch((error) => { // Catch any error with execution and return a 500 response.setStatusCode(500) response.setBody(JSON.stringify({ requestId: payload.headers['X-Amz-Firehose-Request-Id'][0], timestamp: (new Date()).getTime(), errorMessage: error })) return })

#Our WebHook URL

Now we've finished writing our Realm Function, save and deploy it. Then on the settings tab copy the WebHook URL, we'll need it in just a moment.

#Creating an AWS Kinesis Delivery Stream

To create our Kinesis Delivery Stream we're going to use the AWS CLI, and you'll need the following information:

  • Your Kinesis Data Stream ARN
  • The ARN of your respective IAM roles, also ensure that service-principal firehose.amazonaws.com is allowed to assume these roles
  • Bucket and Role ARNs for the S3 bucket to be used for errors/backups
  • MongoDB Realm WebHook URL
  • The value of the FIREHOSE_ACCESS_KEY

Your final AWS CLI command will look something like this:

1
2
3
4
5
6
7
8
9
10
11
aws firehose --endpoint-url "https://firehose.us-east-1.amazonaws.com" \ create-delivery-stream --delivery-stream-name RealmDeliveryStream \ --delivery-stream-type KinesisStreamAsSource \ --kinesis-stream-source-configuration \ "KinesisStreamARN=arn:aws:kinesis:us-east-1:78023564309:stream/realm,RoleARN=arn:aws:iam::78023564309:role/KinesisRealmRole" \ --http-endpoint-destination-configuration \ "RoleARN=arn:aws:iam::78023564309:role/KinesisFirehoseFullAccess,\ S3Configuration={RoleARN=arn:aws:iam::78023564309:role/KinesisRealmRole, BucketARN=arn:aws:s3:::realm-kinesis},\ EndpointConfiguration={\ Url=https://webhooks.mongodb-stitch.com/api/client/v2.0/app/realmkinesis-aac/service/kinesis/incoming_webhook/kinesisDestination,\ Name=RealmCloud,AccessKey=sdhfjkdbf347fb3icb34i243orn34fn234r23c}"

If everything executes correctly, you should see your new Delivery Stream appear in your Kinesis Dashboard. Also, after a few moments, the WebHook event will appear in your Realm logs and documents will begin to populate your collection!

Screenshot Kinesis delivery stream dashboardScreenshot of Realm logsScreenshot of a collection in MongoDB Atlas

#Next Steps

With the Kinesis data now in MongoDB Atlas, we have a wealth of possibilities. We can transform it with aggregation pipelines, visualise it with Charts, turn it into a GraphQL API, or even trigger more Realm functions or services.

#Further reading

Now you've seen how you can use MongoDB Realm as an AWS Kinesis HTTP Endpoint you might find our other articles on using MongoDB with Kinesis useful:

If you haven't yet set up your free cluster on MongoDB Atlas, now is a great time to do so. You have all the instructions in this blog post.

MongoDB Icon
  • Developer Hub
  • Documentation
  • University
  • Community Forums

© MongoDB, Inc.