How to convert a String field to ObjectId in MongoSinkConnector

Hi,

I’m trying to sink a Kafka topic using com.mongodb.kafka.connect.MongoSinkConnector and the messages follow this avro definition:

record MyObjectType {
    string id;
    string userId;
}

What I want to do is to make the sink connector treat the userId attribute as the _id (of type ObjectId) in the upserted MongoDB document.

i.e. given avro message:

{
    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"
}

I want the below MongoDB document

{
    "_id" : ObjectId("60a50547e578c87ac72f1041")
    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"
}

I have made various attempts, e.g. to use SMTs to alter the incoming avro message to be similar to extended json format { _id : {$oid: "60a50547e578c87ac72f1041"} }

The closest I’ve got is using the below connector configuration

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://local-kafka-schema-registry-cp-schema-registry:8081",
"value.converter.schemas.enable": true,
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
"transforms":"createKey,RenameField_userId,HoistField_id",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"userId",
"transforms.RenameField_userId.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.RenameField_userId.renames": "userId:oid",
"transforms.HoistField_id.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.HoistField_id.field": "_id"

Then I get this document inserted into MongoDB

{
    "_id" : {
        "oid" : "60a50547e578c87ac72f1041"
    },
    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"
}

Some notes:

  • Ideally I would like also id and userId be converted to ObjectId in the target collection
  • I can change the structure of the Avro if needed, but I cannot change the fact that the resulting _id must be an ObjectId.
  • The only document.id.strategy I’ve found that works with ObjectId is the BsonOidStrategy, but that one is generating a new random ObjectId, not based on a value in the message.

Any suggestions to accomplish this would be greatly appreciated!