Group aggregation slow

I have 600 million records and i am trying to run group stage and its taking 30 min .
Stage 1->

$match:{
is_workflow_processing: false ,
is_error: true
}

Stage 2->

$group:{
_id: {
status: “$status”,
Risk: “$control_monitorkey”,
User: “$Masterid”,
AssetID: “$SYSTEMID”
},
cnt: { $sum: 1 }
}

stage3>

rawData: [ {
project: { Status: "_id.status",
RiskName: “_id.Risk", userId: "_id.User”,
assetId: “$_id.AssetID”,
ExceptionCount: “$cnt”,
_id: 0.0,
},
{
$sort: {
ExceptionCount: -1
}
},
{
$skip: 0
},
{
$limit: 1000
}
],
count: [ { $count: “sum” } ]

Note-> Index is created on is_workflow_processing and is_error fields
server details-> 64gb RAM,16 core CPU,4.2 mongodb version

Hi @Narendra_S_Sikarwar and welcome in the MongoDB Community :muscle: !

If you already have the compound index {is_workflow_processing:1, is_error:1}, then your pipeline looks pretty much optimized here.

Maybe you could try to create a bigger index + $sort before the $group so the documents in input of the $group state are already sorted which could help the algorithm - eventually.

Take this pipeline as an example:

[
  {
    '$match': {
      'country_code': 840
    }
  }, {
    '$sort': {
      'country': 1, 
      'state': 1, 
      'county': 1
    }
  }, {
    '$group': {
      '_id': {
        'c': '$country', 
        's': '$state', 
        'cc': '$county'
      }, 
      'count': {
        '$sum': 1
      }
    }
  }, {
    '$project': {
      '_id': 0, 
      'country': '$_id.c', 
      'state': '$_id.s', 
      'county': '$_id.cc', 
      'count': '$count'
    }
  }, {
    '$sort': {
      'count': -1
    }
  }, {
    '$limit': 100
  }
]

This pipeline can use the index {country_code: 1, country:1, state:1, county: 1}. Try with and without the $sort operation and see if you have any improvements.

Else - last obvious solution - you need to reduce the number of documents at the $match stage and support it with an index.

I hope this help. You can read the discussions in this ticket for more details.

Cheers,
Maxime.

1 Like

Hey @Narendra_S_Sikarwar,

I actually gave this a second thought and I think I found a faster way to deal with this: by using a covered query.

  1. I created this little python script that creates some fake data + creates the index: {is_workflow_processing:1, is_error:1, status: 1, control_monitorkey:1, Masterid:1, SYSTEMID:1}
from faker import Faker
from pymongo import ASCENDING
from pymongo import MongoClient

fake = Faker()


def random_docs():
    docs = []
    for _ in range(10000):
        doc = {
            'firstname': fake.first_name(),
            'lastname': fake.last_name(),
            'is_workflow_processing': fake.pybool(),
            'is_error': fake.pybool(),
            'status': fake.pyint(),
            'control_monitorkey': fake.pyint(),
            'Masterid': fake.pyint(),
            'SYSTEMID': fake.pyint()}
        docs.append(doc)
    return docs


if __name__ == '__main__':
    client = MongoClient()
    collection = client.test.coll
    collection.insert_many(random_docs())
    collection.create_index([("is_workflow_processing", ASCENDING), ("is_error", ASCENDING), ("status", ASCENDING),
                             ("control_monitorkey", ASCENDING), ("Masterid", ASCENDING), ("SYSTEMID", ASCENDING), ])
    print('Done!')

In your case, the index might be a little fat but you have 64GB of RAM… So it should get the job done fast with the following aggregation pipeline:

  1. Cover the query with a $match + $group with a subset of fields.
    Note that we don’t need a $project right after the $match because it’s already optimized automatically for us.
[
  {
    '$match': {
      'is_workflow_processing': false, 
      'is_error': true
    }
  }, {
    '$group': {
      '_id': {
        'status': '$status', 
        'control_monitorkey': '$control_monitorkey', 
        'Masterid': '$Masterid', 
        'SYSTEMID': '$SYSTEMID'
      }, 
      'count': {
        '$sum': 1
      }
    }
  }, {
    '$project': {
      '_id': 0, 
      'status': '$_id.status', 
      'control_monitorkey': '$_id.control_monitorkey', 
      'Masterid': '$_id.Masterid', 
      'SYSTEMID': '$_id.SYSTEMID', 
      'count': 1
    }
  }, {
    '$sort': {
      'count': -1
    }
  }, {
    '$limit': 5
  }
]

Here is the explain plan of this aggregation:

{
	"stages" : [
		{
			"$cursor" : {
				"queryPlanner" : {
					"plannerVersion" : 1,
					"namespace" : "test.coll",
					"indexFilterSet" : false,
					"parsedQuery" : {
						"$and" : [
							{
								"is_error" : {
									"$eq" : true
								}
							},
							{
								"is_workflow_processing" : {
									"$eq" : false
								}
							}
						]
					},
					"queryHash" : "86D43161",
					"planCacheKey" : "AF368344",
					"winningPlan" : {
						"stage" : "PROJECTION_COVERED",
						"transformBy" : {
							"Masterid" : 1,
							"SYSTEMID" : 1,
							"control_monitorkey" : 1,
							"status" : 1,
							"_id" : 0
						},
						"inputStage" : {
							"stage" : "IXSCAN",
							"keyPattern" : {
								"is_workflow_processing" : 1,
								"is_error" : 1,
								"status" : 1,
								"control_monitorkey" : 1,
								"Masterid" : 1,
								"SYSTEMID" : 1
							},
							"indexName" : "is_workflow_processing_1_is_error_1_status_1_control_monitorkey_1_Masterid_1_SYSTEMID_1",
							"isMultiKey" : false,
							"multiKeyPaths" : {
								"is_workflow_processing" : [ ],
								"is_error" : [ ],
								"status" : [ ],
								"control_monitorkey" : [ ],
								"Masterid" : [ ],
								"SYSTEMID" : [ ]
							},
							"isUnique" : false,
							"isSparse" : false,
							"isPartial" : false,
							"indexVersion" : 2,
							"direction" : "forward",
							"indexBounds" : {
								"is_workflow_processing" : [
									"[false, false]"
								],
								"is_error" : [
									"[true, true]"
								],
								"status" : [
									"[MinKey, MaxKey]"
								],
								"control_monitorkey" : [
									"[MinKey, MaxKey]"
								],
								"Masterid" : [
									"[MinKey, MaxKey]"
								],
								"SYSTEMID" : [
									"[MinKey, MaxKey]"
								]
							}
						}
					},
					"rejectedPlans" : [ ]
				}
			}
		},
		{
			"$group" : {
				"_id" : {
					"status" : "$status",
					"control_monitorkey" : "$control_monitorkey",
					"Masterid" : "$Masterid",
					"SYSTEMID" : "$SYSTEMID"
				},
				"count" : {
					"$sum" : {
						"$const" : 1
					}
				}
			}
		},
		{
			"$project" : {
				"count" : true,
				"status" : "$_id.status",
				"control_monitorkey" : "$_id.control_monitorkey",
				"Masterid" : "$_id.Masterid",
				"SYSTEMID" : "$_id.SYSTEMID",
				"_id" : false
			}
		},
		{
			"$sort" : {
				"sortKey" : {
					"count" : -1
				},
				"limit" : NumberLong(5)
			}
		}
	],
	"serverInfo" : {
		"host" : "hafx",
		"port" : 27017,
		"version" : "4.4.1",
		"gitVersion" : "ad91a93a5a31e175f5cbf8c69561e788bbc55ce1"
	},
	"ok" : 1,
	"$clusterTime" : {
		"clusterTime" : Timestamp(1602536597, 4),
		"signature" : {
			"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
			"keyId" : NumberLong(0)
		}
	},
	"operationTime" : Timestamp(1602536597, 4)
}

As you can see above, you only have an IXSCAN stage followed by a PROJECTION_COVERED stage. No FETCH stage: meaning that no data is retrieved from disk. It’s only using the content of the index in RAM.

If this query isn’t below the second - I don’t get it!

Cheers,
Maxime.

Hi @MaBeuLux88 thanks for your guidance.
1: i am trying to use sort in 2 stage (after match before group) but its takes more time. without sort its better.
2: i am trying to use project (after match before group) also takes more time
3: i am trying to chunks the data (Example -> use “DATE” : {$gte:ISODate(“2013-01-07T00:00:00.000+0000”)} in first stage but also time taking )
I don’t know what is the issue every thing is slow on 2 stage which is group

Can you share an explain plan? Which index are you using?
Without the covering index, you most probably have a FETCH stage in your execution plan which needs to retrieve all the documents from disk… Which is LONG it’s also putting a lot of pressure on the WT cache.

{ 
    "stages" : [
        {
            "$cursor" : {
                "query" : {
                    "workflow_stage_current_assignee" : "gladmin", 
                    "is_workflow_processing" : false, 
                    "is_error" : true, 
                    "is_delegated" : false, 
                    "is_escalated" : false
                }, 
                "fields" : {
                    "Masterid" : NumberInt(1), 
                    "SYSTEMID" : NumberInt(1), 
                    "control_monitorkey" : NumberInt(1), 
                    "status" : NumberInt(1), 
                    "_id" : NumberInt(0)
                }, 
                "queryPlanner" : {
                    "plannerVersion" : NumberInt(1), 
                    "namespace" : "GLT_Narendra.EXCEPTIONS", 
                    "indexFilterSet" : false, 
                    "parsedQuery" : {
                        "$and" : [
                            {
                                "is_delegated" : {
                                    "$eq" : false
                                }
                            }, 
                            {
                                "is_error" : {
                                    "$eq" : true
                                }
                            }, 
                            {
                                "is_escalated" : {
                                    "$eq" : false
                                }
                            }, 
                            {
                                "is_workflow_processing" : {
                                    "$eq" : false
                                }
                            }, 
                            {
                                "workflow_stage_current_assignee" : {
                                    "$eq" : "gladmin"
                                }
                            }
                        ]
                    }, 
                    "queryHash" : "7203DD95", 
                    "planCacheKey" : "9EA1C330", 
                    "winningPlan" : {
                        "stage" : "FETCH", 
                        "inputStage" : {
                            "stage" : "IXSCAN", 
                            "keyPattern" : {
                                "workflow_stage_current_assignee" : NumberInt(1), 
                                "is_workflow_processing" : NumberInt(1), 
                                "is_error" : NumberInt(1), 
                                "is_delegated" : NumberInt(1), 
                                "is_escalated" : NumberInt(1)
                            }, 
                            "indexName" : "workflow_stage_current_assignee_1_is_workflow_processing_1_is_error_1_is_delegated_1_is_escalated_1", 
                            "isMultiKey" : false, 
                            "multiKeyPaths" : {
                                "workflow_stage_current_assignee" : [

                                ], 
                                "is_workflow_processing" : [

                                ], 
                                "is_error" : [

                                ], 
                                "is_delegated" : [

                                ], 
                                "is_escalated" : [

                                ]
                            }, 
                            "isUnique" : false, 
                            "isSparse" : false, 
                            "isPartial" : false, 
                            "indexVersion" : NumberInt(2), 
                            "direction" : "forward", 
                            "indexBounds" : {
                                "workflow_stage_current_assignee" : [
                                    "[\"gladmin\", \"gladmin\"]"
                                ], 
                                "is_workflow_processing" : [
                                    "[false, false]"
                                ], 
                                "is_error" : [
                                    "[true, true]"
                                ], 
                                "is_delegated" : [
                                    "[false, false]"
                                ], 
                                "is_escalated" : [
                                    "[false, false]"
                                ]
                            }
                        }
                    }, 
                    "rejectedPlans" : [

                    ]
                }
            }
        }, 
        {
            "$group" : {
                "_id" : {
                    "status" : "$status", 
                    "Risk" : "$control_monitorkey", 
                    "User" : "$Masterid", 
                    "AssetID" : "$SYSTEMID"
                }, 
                "cnt" : {
                    "$sum" : {
                        "$const" : 1.0
                    }
                }
            }
        }
    ], 
    "serverInfo" : {
        "host" : "XXXXXX", 
        "port" : NumberInt(27017), 
        "version" : "4.2.6", 
        "gitVersion" : "20364840b8f1af16917e4c23c1b5f5efd8b352f8"
    }, 
    "ok" : 1.0, 
    "$clusterTime" : {
        "clusterTime" : Timestamp(1602593872, 1), 
        "signature" : {
            "hash" : BinData(0, "jTuRMJ6j3jr6GDKbPn8Vykf1zU0="), 
            "keyId" : NumberLong(6828817573358862341)
        }
    }, 
    "operationTime" : Timestamp(1602593872, 1)
}

You have a FETCH in this explain plan. It means the index isn’t covering all the fields you need for this query and it has to fetch all the documents from disk that you didn’t eliminate in the $match stage.

You need to evaluate if the index with all the fields will be too big or not. Depending on the cardinality of each field, it can get bigger or smaller if you switch the order of the fields but the first 5 must be the one you currently have to cover the $match correctly I guess.

I am trying to change order of my match stage and its give me o/p like this

{ 
    "stages" : [
        {
            "$cursor" : {
                "query" : {
                    "is_workflow_processing" : false, 
                    "is_error" : true, 
                    "is_escalated" : false, 
                    "is_delegated" : false, 
                    "workflow_stage_current_assignee" : "gladmin", 
                    "TX_DATE" : {
                        "$gte" : ISODate("2017-06-19T00:00:00.000+0000")
                    }
                }, 
                "fields" : {
                    "Masterid" : NumberInt(1), 
                    "SYSTEMID" : NumberInt(1), 
                    "control_monitorkey" : NumberInt(1), 
                    "status" : NumberInt(1), 
                    "_id" : NumberInt(0)
                }, 
                "queryPlanner" : {
                    "plannerVersion" : NumberInt(1), 
                    "namespace" : "GLT_Narendra.EXCEPTIONS", 
                    "indexFilterSet" : false, 
                    "parsedQuery" : {
                        "$and" : [
                            {
                                "is_delegated" : {
                                    "$eq" : false
                                }
                            }, 
                            {
                                "is_error" : {
                                    "$eq" : true
                                }
                            }, 
                            {
                                "is_escalated" : {
                                    "$eq" : false
                                }
                            }, 
                            {
                                "is_workflow_processing" : {
                                    "$eq" : false
                                }
                            }, 
                            {
                                "workflow_stage_current_assignee" : {
                                    "$eq" : "gladmin"
                                }
                            }, 
                            {
                                "TX_DATE" : {
                                    "$gte" : ISODate("2017-06-19T00:00:00.000+0000")
                                }
                            }
                        ]
                    }, 
                    "queryHash" : "67F7F482", 
                    "planCacheKey" : "172BB4B5", 
                    "winningPlan" : {
                        "stage" : "PROJECTION_COVERED", 
                        "transformBy" : {
                            "Masterid" : NumberInt(1), 
                            "SYSTEMID" : NumberInt(1), 
                            "control_monitorkey" : NumberInt(1), 
                            "status" : NumberInt(1), 
                            "_id" : NumberInt(0)
                        }, 
                        "inputStage" : {
                            "stage" : "IXSCAN", 
                            "keyPattern" : {
                                "is_workflow_processing" : 1.0, 
                                "is_error" : 1.0, 
                                "is_delegated" : 1.0, 
                                "is_escalated" : 1.0, 
                                "workflow_stage_current_assignee" : 1.0, 
                                "TX_DATE" : 1.0, 
                                "status" : 1.0, 
                                "Masterid" : 1.0, 
                                "SYSTEMID" : 1.0, 
                                "control_monitorkey" : 1.0
                            }, 
                            "indexName" : "inboxaggregatetest", 
                            "isMultiKey" : false, 
                            "multiKeyPaths" : {
                                "is_workflow_processing" : [

                                ], 
                                "is_error" : [

                                ], 
                                "is_delegated" : [

                                ], 
                                "is_escalated" : [

                                ], 
                                "workflow_stage_current_assignee" : [

                                ], 
                                "TX_DATE" : [

                                ], 
                                "status" : [

                                ], 
                                "Masterid" : [

                                ], 
                                "SYSTEMID" : [

                                ], 
                                "control_monitorkey" : [

                                ]
                            }, 
                            "isUnique" : false, 
                            "isSparse" : false, 
                            "isPartial" : false, 
                            "indexVersion" : NumberInt(2), 
                            "direction" : "forward", 
                            "indexBounds" : {
                                "is_workflow_processing" : [
                                    "[false, false]"
                                ], 
                                "is_error" : [
                                    "[true, true]"
                                ], 
                                "is_delegated" : [
                                    "[false, false]"
                                ], 
                                "is_escalated" : [
                                    "[false, false]"
                                ], 
                                "workflow_stage_current_assignee" : [
                                    "[\"gladmin\", \"gladmin\"]"
                                ], 
                                "TX_DATE" : [
                                    "[new Date(1497830400000), new Date(9223372036854775807)]"
                                ], 
                                "status" : [
                                    "[MinKey, MaxKey]"
                                ], 
                                "Masterid" : [
                                    "[MinKey, MaxKey]"
                                ], 
                                "SYSTEMID" : [
                                    "[MinKey, MaxKey]"
                                ], 
                                "control_monitorkey" : [
                                    "[MinKey, MaxKey]"
                                ]
                            }
                        }
                    }, 
                    "rejectedPlans" : [

                    ]
                }
            }
        }, 
        {
            "$group" : {
                "_id" : {
                    "status" : "$status", 
                    "Risk" : "$control_monitorkey", 
                    "User" : "$Masterid", 
                    "AssetID" : "$SYSTEMID"
                }, 
                "cnt" : {
                    "$sum" : {
                        "$const" : 1.0
                    }
                }
            }
        }
    ], 
    "serverInfo" : {
        "host" : "GLT-S206", 
        "port" : NumberInt(27017), 
        "version" : "4.2.6", 
        "gitVersion" : "20364840b8f1af16917e4c23c1b5f5efd8b352f8"
    }, 
    "ok" : 1.0, 
    "$clusterTime" : {
        "clusterTime" : Timestamp(1602763420, 1), 
        "signature" : {
            "hash" : BinData(0, "f2I2ge99mQqmy4QFbvdvubTKfvI="), 
            "keyId" : NumberLong(6828817573358862341)
        }
    }, 
    "operationTime" : Timestamp(1602763420, 1)
}

@MaBeuLux88 give me some idea what should i do and from where i can start for optimise this

and what is difference between “stage” : “PROJECTION_COVERED”, and “stage”: “FETCH” in this explain plan

FETCH means that MongoDB is fetching the entirety of all the documents required to continue the pipeline from the disk. This generates a lot of IOPS if many documents are fetched and even more if these documents are big. All these documents are loaded in RAM which can force RAM evictions for other documents in the working set which isn’t ideal and generate cache pressure.

PROJECTION_COVERED means that the pipeline directly or indirectly defines the fields that the pipeline needs to work and finally outputs. Because of this, if all these fields are present in an index, ONLY the content of the index is required to perform the aggregation and the FETCH step isn’t necessary. So for this kind of query, the disk is not solicited, everything works in RAM and it’s supposed to be way faster than the same pipeline with a FETCH operation - given that you have enough CPU and RAM available for the entirety of this index.

What’s the execution time here? Is it better now?

Also, do you still have enough RAM for the indexes + working set + queries? Is the CPU saturated?

I hope this helps :smiley: !

Cheers,
Maxime.

Execution time is 20 min . i am going to reduce the group field may be this will give me batter solution. I hope this helps …

When you are running this query, what is saturated? Is it the CPU or the RAM? If it’s running 100% on RAM, it should not be the disk. Does the index fit in RAM? If it’s a covered query and the index fits in RAM, it should take a lot less and you should see almost zero IOPS.

Query takes 50% of RAM, CPU takes 100-150% out of 800%(if we are assuming 8core cpu).
Sometimes explain plan give me projection covered stage and sometimes give me projection default . It’s depends which fields I am using in group stage.

I think I am going to create compound index including all fields in match stage and all fields in group stage that’s give me projection covered stage in my Winning plan.

1 Like