A pipeline execution triggered with a database trigger in Atlas

Hey guys,
I’d like to execute a pipeline in Atlas if any change in a source collection happened.
The code sample what I used in scheduled trigger doesn’t work, it causes error and trigger disabled.
The placeholder code in Database trigger / Functions doesn’t help similarly to the info available at:
https://docs.mongodb.com/realm/triggers/database-triggers.
Log shows its status is OK, but it didn’t do what it supposed to do.
A code sample would be really helpful and appreciated,
thank you!

Did you look here: Trigger Snippets — MongoDB Realm?

Hi @Nuri_Halperin

yes I did, there are 2 DB trigger snippets, different use cases.
What I’d like is if any change happens in source collection, execute a pipeline (which updates another collection based on data changed in first).
I try to use the code tweaked from a scheduled trigger.
Thank you.

Hi Vane, it might be helpful if you pasted your trigger code here to understand what exactly you’re trying to achieve and where you might need to make changes.

1 Like

Maybe someone find this useful:
https://docs.mongodb.com/realm/mongodb/run-aggregation-pipelines

Hi @Sumedha_Mehta1

I basically try to use the code I got here as a starter for scheduled triggers, supposing the function (triggered by a database trigger, by a collection change) to be executed might be the same as that one, but it is probably wrong.
The code ( without the pipeline, which is working properly by running manually ) is:

exports = async function() {
var db = <database>;
var sourceColl = <sourceCollection>;
var collection = context.services.get("Cluster0").db(db).collection(sourceColl);
        var pipeline =[
 
    ]; //Paste pipeline
    
await collection.aggregate(pipeline).toArray();
}

Thank you!

Hey Vane, I’m not sure exactly what your code is trying to do but based on what you said:

What I’d like is if any change happens in source collection, execute a pipeline (which updates another collection based on data changed in first).

Here is a general outline that should work for you -

  1. configure the Trigger to be a database trigger than runs on on whatever change you’re looking for (e.g. UPDATE and REPLACE)
  2. Link the collection that will source the change events
exports = async function(changeEvent) {
  
	const { updateDescription, fullDocument } = changeEvent;
        // prints updated document from 
	console.log("changed document: ", JSON.stringify(fullDocument));

       // get Collection you will make changes to
       var collection = context.services.get("Cluster0").db(db).collection(sourceColl);
       
       // 

       var pipeline =[
            //Paste pipeline that uses values from fullDocument
        ];

        var result = await collection.aggregate(pipeline).toArray()

}

When writing your function make sure it runs as a ‘System’ Function. Here are a few of the limitations of the aggregation pipeline - https://docs.mongodb.com/realm/mongodb/crud-and-aggregation-apis#aggregation-pipeline-stage-availability

Thank you @Sumedha_Mehta1 for your response! :slight_smile:

I added 2 more lines to your code defining db and sourceColl so it starts this way:

exports = async function(changeEvent) {
  var db = "database";
  var sourceColl = "sourceCollection";

I changed all 3 of my database type Atlas triggers’ functions according to your code,
but I still got all database triggers “Suspended”
First, after restarting with resume token I got:
“(InvalidResumeToken) Attempting to resume a change stream using ‘resumeAfter’ is not allowed from an invalidate notification.”
after restarting without resume token I got:
“change stream was invalidated”

In Realm Logs every each database trigger was fired once with status = OK with log content of:
[
“changed document: undefined”
]
and after that were suspended.
All 3 database triggered functions run as system user and none of the functions contain pipelines with staging type of:
$indexStats.
It’s run on a free M0 cluster.

So I still don’t know what’s wrong.
Any more help is really appreciated, thank you! :slight_smile:

Hi Vane - did you resuming the Triggers with the directions listed here - https://docs.mongodb.com/realm/triggers/database-triggers#restart-a-database-trigger

It’s also possible that you may have made changes to your cluster (things like dropDatabase and renameCollection) will invalidate events.

Can you try to delete your current triggers and create new ones and see if this issue persists? Otherwise this might be beyond the scope of the forum and I would suggest you open a support ticket.

Hi @Sumedha_Mehta1,

thank you for the response!

  • I did try to restart triggers, like I wrote I tried both with and without using resume token.
    That time I made it from Atlas, now I repeated it in Realm, they became suspended after a short time.
  • I didn’t made any change in cluster, BUT I DID change some of the collection names!
    Isn’t there any way to fix this?
  • I also deleted all triggers and functions, I double checked them both in Atlas and Realm and recreated them. I also created a new database trigger with new function. ALL of them are still turn suspended in a short while!

I have a collection storing raw ecommerce data and I use aggregation pipeline to create pre-aggregated data in new collections.
If you recommend, I export my raw data collection, drop all old collections and DB, create a new DB with a new collection, import documents in it, modify existing pipelines and functions and recreate triggers.
Do you think it works?

EDITED:
I also created new collections storing aggregated data, so their name is surely not modified, then I changed functions in Atlas accordingly and restarted all 3 database triggers, both with and without resume token.
All 3 function processed once their task, so the target collections got their documents, but after that database triggers became suspended, and even after new restarts they don’t process new documents!

Thank you!