Kafka source - how to set partition key?

I’m using the Kafka source connector to build a change data capture pipeline from our legacy applications that store data in Mongo.

It is important to us that our sinks receive changes to a Mongo document in the right order. (e.g. insert must be first, followed by any updates, followed possibly by a delete).

I believe the answer is to partition our topic based on documentId. That way a fast sink cannot process the a document delete before a slower sink has processed the insert.

But how do we configure the Kafka Source Connector to use document Id (or any other field) as the partition key?

Hi,

So you are right that, in general, your change streams documents will only be ordered within one an the same kafka topic partition. For the kafka source connector scenario this means that the key of the kafka connect source record must be properly constructed.

My bad I got this wrong myself on the first answering attempt because I was actually working and reading different things in parallel :see_no_evil: which I shouldn’t do :grimacing:

Addendum: The _id field however will per default contain the resume token NOT the original document’s _id though. This means you have to explicitly change the key on the way into kafka. For this you could try to apply a proper SMT configuration with any of the existing SMTs and if this doesn’t work - depending on your other config settings of the connector - you could implement a custom SMT which allows you to change the records’ keys to the _id field you need.

Also you can simply verify what the actual key is given your specific configuration by inspecting the kafka records from the target topic directly, e.g. by means of the kafka (avro) console consumer which allows you to also print the key of each records together with the value. Then you should explicitly see what data is contained in your record’s key and if it contains what it should/you expect or not.

Hope this helps!

1 Like