Scheduled aggregation best practices

Hey everyone,

I’m sorry for the total noob question, but after days of docs reading I wasn’t able to decide what is the best way to run scheduled aggregation on data stored in Atlas.

I have aggregation pipeline set in Aggregation Pipeline Builder, basically I’d like to run that by schedule and save aggregated data in a separate new db/collection.

Should I have to use Realm / Functions or is there an easier way?

Thank you in advance!

1 Like

Hi @Vane_T,

I have written a blog that exactly demonstrate this ability with Atlas scheduled triggers.

This is not hard in stone and its mostly a concept, hopes it is useful:
Spirits in the Materialized View: Automatic Refresh of Materialized Views | MongoDB Blog

Let me know if something is not clear.

Thanks
Pavel

2 Likes

Hi @Pavel_Duchovny

thank you for you response! :slight_smile:

I will study it, but not being a developer with only some familiarity with it I was hoping for some copy paste code like ones available in Aggregation Pipeline Builder where we can “Export Pipeline To Language”, so I supposed there might be some such obvious solution for us,
where I copy that code in some format, and trigger its execution with the frequency I want in a schedule.
I understand it is mainly for devs with a min. knowledge, but not all guys who control a budget are on that level :slight_smile:
I use the code I got from there for Node.js and run it from Visual Studio Code, the MongoDB connection is OK, the code runs there in VSC without error, but the aggregation is not executed at all (checking it in Atlas).
I was hoping if I can execute it manually then I can copy it and trigger its run by an Atlas trigger.
Any more idea is really appreciated though and thank you again! :slight_smile:

1 Like

Hi @Vane_T,

For a straight forward code could look something like , paste your details and pipeline

async  function()
{
var db = "db";
var sourceColl = "sourceColl";
var collection = context.services.get("<Atlas-service>").db(db).collection(sourceColl);
        var pipeline =[...] //Paste pipeline
    
await collection.aggregate(pipeline).toArray();


}

Best
Pavel

2 Likes

Thank you @Pavel_Duchovny,

I created a schedule trigger in Atlas, set link data source to Cluster0, added your code to Functions area (above the existing sample code as new function), set “db” as source database name, “sourceColl” as source collection, "<Atlas-service>" as “Cluster0” ( like comment is sample function code referred to and which cluster I use ), pasted in pipeline text (not Node.js code, but only the pipeline object), removed a of 2 to set
var pipeline =[...]
properly - I think this is the correct way…
I gave a name to the function ( I was asked to do ).
I have 2 yellow warning related to missing semicolons, I couldn’t figure out why is it missed:
in line:
async function aggregation() {
and line:
await collection.aggregate(pipeline).toArray();

After saving I was waiting for its run ( 1 min scheduled) , it didn’t happen, then I clicked on Run button of Console and I got:
“> ran on Fri Jan 08 2021 21:56:59 GMT+0100 (Central European Standard Time)
> took 315.087827ms
> result:
{
“$undefined”: true
}
> result (JavaScript):
EJSON.parse(’{”$undefined":true}’)
Do you see some mistake I made?
Thank you! :slight_smile:

1 Like

Hi @Vane_T,

You should place the database name and the source coll as your data source you have built the builder pipeline on.

Can you share the pipeline?

Additionally I think the function definition should look like exports = async function ()..

Thanks
Pavel

2 Likes

Hi @Pavel_Duchovny,

my latest varsion of full code I inserted above sample function:

exports = async function aggregation() {
var db = "WoocommerceRiskViaIntegromat";
var sourceColl = "ordersExtended";
var collection = context.services.get("Cluster0").db(db).collection(sourceColl);
        var pipeline =[
    {
    '$project': {
      'businessMeta.client_id': true, 
      'businessMeta.client_name': true, 
      'businessMeta.webshop_id': true, 
      'businessMeta.webshop_name': true, 
      'numVerify.valid': true, 
      'numVerify.international_format': true, 
      'numVerify.country_prefix': true, 
      'numVerify.country_code': true, 
      'numVerify.location': true, 
      'order.id': true, 
      'order.status': true, 
      'order.currency': true, 
      'order.total': true, 
      'order.date_created': true, 
      'order.date_modified': true, 
      'order.date_paid': true, 
      'order.date_completed': true, 
      'order.customer_id': true, 
      'order.customer_ip_address': true, 
      'order.customer_user_agent': true, 
      'order.customer_note': true, 
      'order.payment_method': true
    }
  }, {
    '$sort': {
      'order.date_modified': -1
    }
  }, {
    '$out': 'ordersProcessedAndSorted'
  }
  ]; //Paste pipeline
    
await collection.aggregate(pipeline).toArray();

}

Thank you!

1 Like

@Vane_T,

That’s look correct. What is the problem?

Thanks
Pavel

1 Like

@Pavel_Duchovny,

  • it has 2 syntax warnings I referred to ( misses semicolons; pls. see above )
  • after running it by clicking on button “Run” at console it gives error:

    result:
    {
    “$undefined”: true
    }
    result (JavaScript):
    EJSON.parse(‘{“$undefined”:true}’)

  • it doesn’t run on schedule
  • it doesn’t show new documents in target collection which was added to source collection

so simply it doesn’t work, yet! :slight_smile:

Can you send us a link to your triggers definition.

1 Like

@Pavel_Duchovny
I’m sorry, I don’t understand how can I send you a link to a trigger definition.
Also, I mentioned, I can not run it even from its console:
/*
To Run the function:
- Type ‘exports();’
- Click ‘Run’
*/

Anyway it is a Scheduled, Basic trigger type, Enabled, Cluster0 is linked in Link Data Source(s),
schedule is 1 minutes default and event type is Function

1 Like

Hi @Vane_T,

When you are on the trigger definition page just copy the url from the browser and paste it here.

Thanks
Pavel

1 Like

@Pavel_Duchovny

here you are, thank you!
https://cloud.mongodb.com/v2/5fdcf706a8d3a31512f87528#triggers/5ff8c10a106b1ff8837e7844

1 Like

Hi @Vane_T,

I think you have some malformed code for the specific trigger syntax. The current code executes the second function (line 43) which is empty. The idea is to replace this function and not add to it.

  1. The first line should be exports = async function() {
  2. Delete everything from line 43 and on.

Sorry if my code snippet mislead you I wrote it from memory.

Thanks
Pavel

2 Likes

Hi @Pavel_Duchovny

I’ve already tried your code earlier both above and below sample code and also I tried with sample deleted, like you suggest.
I did what you asked in your last and I got 2 warnings for line 1, 1 warning for line 40 and I still got error message in console:
> ran on Sat Jan 09 2021 17:27:41 GMT+0100 (Central European Standard Time)
> took 647.080429ms
> result:
{
“$undefined”: true
}
> result (JavaScript):
EJSON.parse(’{"$undefined":true}’)
I see though that in target collection there is 11 docs now, that’s good, so it was executed (at least) one time some times ago.

LATER: I generated 1 more document into source collection and now in both source and in aggregation there are also 12 docs, so the triggered function looks working now :slight_smile:
I made some further indentation changes in pipeline, nothing else.

Pls. note the 2+1 warning in code editor and the console error message is still very confusing, so my issue is solved, but others might also find it confusing…

Thank you for your help again! :slight_smile:

1 Like

Hi @Vane_T,

This is not an error. The execution output is expected as the function does not return anything.

You can look at the logs tab and see that the trigger runs with no errors.

Thanks
Pavel

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.