How to convert a String field to ObjectId in MongoSinkConnector

Hi,

I’m trying to sink a Kafka topic using com.mongodb.kafka.connect.MongoSinkConnector and the messages follow this avro definition:

record MyObjectType {
    string id;
    string userId;
}

What I want to do is to make the sink connector treat the userId attribute as the _id (of type ObjectId) in the upserted MongoDB document.

i.e. given avro message:

{
    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"
}

I want the below MongoDB document

{
    "_id" : ObjectId("60a50547e578c87ac72f1041")
    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"
}

I have made various attempts, e.g. to use SMTs to alter the incoming avro message to be similar to extended json format { _id : {$oid: "60a50547e578c87ac72f1041"} }

The closest I’ve got is using the below connector configuration

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://local-kafka-schema-registry-cp-schema-registry:8081",
"value.converter.schemas.enable": true,
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
"transforms":"createKey,RenameField_userId,HoistField_id",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"userId",
"transforms.RenameField_userId.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.RenameField_userId.renames": "userId:oid",
"transforms.HoistField_id.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.HoistField_id.field": "_id"

Then I get this document inserted into MongoDB

{
    "_id" : {
        "oid" : "60a50547e578c87ac72f1041"
    },
    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"
}

Some notes:

  • Ideally I would like also id and userId be converted to ObjectId in the target collection
  • I can change the structure of the Avro if needed, but I cannot change the fact that the resulting _id must be an ObjectId.
  • The only document.id.strategy I’ve found that works with ObjectId is the BsonOidStrategy, but that one is generating a new random ObjectId, not based on a value in the message.

Any suggestions to accomplish this would be greatly appreciated!

You need to use DocumentIdAddr post processor. Looks like you are on the right track above. Not sure you need all those transforms to make it happen.

I have the some problem, and i cannot find a solution in Post-Processors, you can help me ?

1 Like

I have similar problems.
I want to convert userId in Kafka message to ObjectID in MongoDB Kafka Sink Connector.

I have read Post-processors docs, but can not find solution with built-in processors.
Most of them are about change key or value without changing data type.

Should be something like

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
document.id.strategy.overwrite.existing=true
document.id.strategy.partial.key.projection.list=userid
document.id.strategy.partial.key.projection.type=AllowList

I’m willing to use default document id strategy.
My concern is not _id.

There are some fields which are saved as string in Kafka topic, but their actual values are ObjectID.
For example, Kafka message contains commnetId, userId which represents who has updated the comment.
And in our database, we have user and comment collections.
How can we convert such values to ObjectID in MongoDB Kafka Sink Connector?

If the sink record is a string then its parsed as bson. e.g. if you use extendedJson.

com.mongodb.kafka.connect.source.json.formatter.ExtendedJson

You are using Avro so this might not work well. In that case you would need to write your own post-processor to convert the SinkDocument _id into a BSON ObjectId

Thanks for your reply.
I’m using JsonConverter.

I am facing the same Issue.
Can someone please help here, if you could able to resolve this issue.

Thank you