Hi,
I’m trying to sink a Kafka topic using com.mongodb.kafka.connect.MongoSinkConnector
and the messages follow this avro definition:
record MyObjectType {
string id;
string userId;
}
What I want to do is to make the sink connector treat the userId
attribute as the _id
(of type ObjectId) in the upserted MongoDB document.
i.e. given avro message:
{
"id" : "60a50547e578c87ac72f1042",
"userId" : "60a50547e578c87ac72f1041"
}
I want the below MongoDB document
{
"_id" : ObjectId("60a50547e578c87ac72f1041")
"id" : "60a50547e578c87ac72f1042",
"userId" : "60a50547e578c87ac72f1041"
}
I have made various attempts, e.g. to use SMTs to alter the incoming avro message to be similar to extended json format { _id : {$oid: "60a50547e578c87ac72f1041"} }
The closest I’ve got is using the below connector configuration
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://local-kafka-schema-registry-cp-schema-registry:8081",
"value.converter.schemas.enable": true,
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
"transforms":"createKey,RenameField_userId,HoistField_id",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"userId",
"transforms.RenameField_userId.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.RenameField_userId.renames": "userId:oid",
"transforms.HoistField_id.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.HoistField_id.field": "_id"
Then I get this document inserted into MongoDB
{
"_id" : {
"oid" : "60a50547e578c87ac72f1041"
},
"id" : "60a50547e578c87ac72f1042",
"userId" : "60a50547e578c87ac72f1041"
}
Some notes:
- Ideally I would like also id and userId be converted to ObjectId in the target collection
- I can change the structure of the Avro if needed, but I cannot change the fact that the resulting _id must be an ObjectId.
- The only document.id.strategy I’ve found that works with ObjectId is the BsonOidStrategy, but that one is generating a new random ObjectId, not based on a value in the message.
Any suggestions to accomplish this would be greatly appreciated!