How to convert a String field to ObjectId in MongoSinkConnector


I’m trying to sink a Kafka topic using com.mongodb.kafka.connect.MongoSinkConnector and the messages follow this avro definition:

record MyObjectType {
    string id;
    string userId;

What I want to do is to make the sink connector treat the userId attribute as the _id (of type ObjectId) in the upserted MongoDB document.

i.e. given avro message:

    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"

I want the below MongoDB document

    "_id" : ObjectId("60a50547e578c87ac72f1041")
    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"

I have made various attempts, e.g. to use SMTs to alter the incoming avro message to be similar to extended json format { _id : {$oid: "60a50547e578c87ac72f1041"} }

The closest I’ve got is using the below connector configuration

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": true,
"": "",
"transforms.RenameField_userId.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.RenameField_userId.renames": "userId:oid",
"transforms.HoistField_id.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.HoistField_id.field": "_id"

Then I get this document inserted into MongoDB

    "_id" : {
        "oid" : "60a50547e578c87ac72f1041"
    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"

Some notes:

  • Ideally I would like also id and userId be converted to ObjectId in the target collection
  • I can change the structure of the Avro if needed, but I cannot change the fact that the resulting _id must be an ObjectId.
  • The only I’ve found that works with ObjectId is the BsonOidStrategy, but that one is generating a new random ObjectId, not based on a value in the message.

Any suggestions to accomplish this would be greatly appreciated!