Update in bulk, insert if record missing

The following records are inserted in a collection.
[{"_id" : 1, “field1” : “abc”}, {"_id" : 2, “field1” : “xyz”}]

I want to process the following list in such a way that if the _id already exists, the record is updated else inserted as a fresh record.

[{"_id" : 1, “field2” : “pqr”}, {"_id" : 3, “field2” : “tpr”}]

I could do this in a loop using update, but is there a way to do this in bulk?

Thanks

Hello @cmukesh19 : )

The above code is for replacing the documents completly,i am not sure what you want to do,
But its almost the same code if you want to replace/or just update or add field.

{“newRoot”:existingDoc} => replace with new document
{“newRoot” : {"$mergeObjects" : ["$$ROOT",existingDoc]}} => update or add fields

This command does that,and its also bulk update,he for 2.
If you see the command “updates” is an array,you can put in there many updates(here 2 only).
Use loop and 1 update fuction with argument the doc,to generate 1 big bulk update command

> use testdb
switched to db testdb
> 
> db.testcoll.drop()
true
> 
> db.testcoll.insert([{"_id":1,"field1":"abc"},{"_id":2,"field1":"xyz"}]);
BulkWriteResult({
	"writeErrors" : [ ],
	"writeConcernErrors" : [ ],
	"nInserted" : 2,
	"nUpserted" : 0,
	"nMatched" : 0,
	"nModified" : 0,
	"nRemoved" : 0,
	"upserted" : [ ]
})
> 
> db.testcoll.find();
{ "_id" : 1, "field1" : "abc" }
{ "_id" : 2, "field1" : "xyz" }
> 
> var existingDoc={"_id":1,"field1":"existingID"};
> var newDoc={"_id":3,"field1":"newID"};
> 
> db.runCommand({"update":"testcoll","updates":[
... 
... {"q":{"_id":existingDoc["_id"]},"u":[{"$replaceRoot":{"newRoot":existingDoc}}],"upsert":true,"multi":true},
... 
... {"q":{"_id":newDoc["_id"]},"u":[{"$replaceRoot":{"newRoot":newDoc}}],"upsert":true,"multi":true}]}
... 
... );     
{
	"n" : 2,
	"nModified" : 1,
	"upserted" : [
		{
			"index" : 1,
			"_id" : 3
		}
	],
	"ok" : 1
}
>                
> db.testcoll.find();
{ "_id" : 1, "field1" : "existingID" }
{ "_id" : 2, "field1" : "xyz" }
{ "_id" : 3, "field1" : "newID" }

Inside the pipeline enters only documents that satisfy the query,
or not satisfy the query because query-doc dont exist,in this case the query doc,enters the pipeline.
id 1 was updated(satisfied the query,so enter)
id 2 wasn’t changed(not satisfy,and existed,no enter)
id 3 was inserted(not satisfy but not existed also,so enter)

And if enter the pipeline,root is replaced with the new document.

Hope it helps : )

I have found a way which seems working fine. Posting here for the benefit of others. Although this is still not SQL equivalent of join and update, and still needed updates for each record individually, the performance was very good.

update_list = []
for val in values:
    update_list.append(UpdateOne({'_id': val['_id']}, {'$set': {'xyz': val['xyz']}}, upsert=True))
collection.bulk_write(update_list)  

Essentially, we have to build a list of individual UpdateOne commands, and execute in bulk using bulkwrite (I was using Python in this example).

Hello

Updates can be done using the updates operators($set that you used),or pipeline that i used.
Pipeline updates are usefull when more complicated updates.

If coll1 is on driver(python list) and coll2 in on disk(mongo collection),both code above work
(pipeline or $set).

But if both collections are on disk(because you said join),you can merge them.
Without getting the data from disk to driver and back to database.
For example here testcoll1,is merged into testcoll2.(testcoll2 is updated)
See the example below maybe this is what you want.

> use testdb
switched to db testdb
> db.testcoll1.drop()
true
> db.testcoll2.drop()
true
> 
> db.testcoll1.insert([{"_id":1,"field1":"abc"},{"_id":2,"field1":"xyz"},{"_id":4,"field1":"xyz","field2":"kl"}]);
BulkWriteResult({
	"writeErrors" : [ ],
	"writeConcernErrors" : [ ],
	"nInserted" : 3,
	"nUpserted" : 0,
	"nMatched" : 0,
	"nModified" : 0,
	"nRemoved" : 0,
	"upserted" : [ ]
})
> 
> db.testcoll2.insert([{"_id":1,"field2":"pqr"},{"_id":3,"field2":"tpr"},{"_id":4,"field5":"po"}]);
BulkWriteResult({
	"writeErrors" : [ ],
	"writeConcernErrors" : [ ],
	"nInserted" : 3,
	"nUpserted" : 0,
	"nMatched" : 0,
	"nModified" : 0,
	"nRemoved" : 0,
	"upserted" : [ ]
})
> 
> db.testcoll1.find();
{ "_id" : 1, "field1" : "abc" }
{ "_id" : 2, "field1" : "xyz" }
{ "_id" : 4, "field1" : "xyz", "field2" : "kl" }
> 
> db.testcoll2.find();
{ "_id" : 1, "field2" : "pqr" }
{ "_id" : 3, "field2" : "tpr" }
{ "_id" : 4, "field5" : "po" }
> 
> 
> db.runCommand({"aggregate":"testcoll1","pipeline":[{"$merge":{"into":{"db":"testdb","coll":"testcoll2"},"on":["_id"],"whenMatched":"merge","whenNotMatched":"insert"}}],"maxTimeMS":0,"cursor":{}});
{
	"cursor" : {
		"firstBatch" : [ ],
		"id" : NumberLong(0),
		"ns" : "testdb.testcoll1"
	},
	"ok" : 1
}
> 
> 
> db.testcoll2.find();
{ "_id" : 1, "field2" : "pqr", "field1" : "abc" }
{ "_id" : 3, "field2" : "tpr" }
{ "_id" : 4, "field5" : "po", "field1" : "xyz", "field2" : "kl" }
{ "_id" : 2, "field1" : "xyz" }

Mongo provide many things,joins(lookup),unions,merge etc.I am just not sure what you want.
You might want to do this(if both collections are on the disk).

To do this you need mongodb>=4.2,runCommand is an aggregation,if you take the
pipeline it can work with all drivers.