Kafka Connect source collection and sink the topic from source

Hi,

I would like to share collections between 3 microservices and one of my solutions is to use Kafka MongoDB Connector to achieve it. That means, first source the collection into a dedicated topic, then sink this topic to save data into microservices DBs. I don’t know if it is the best solution but how can I make sure to only source fullDocument and sink it including deleted data.

Any configuration suggestion?

Thank you for your help!

1 Like

I am also looking for this. Did you find any solution ?

There is n example of the Kafka Connector here : https://docs.mongodb.com/kafka-connector/master/kafka-docker-example/

basically, your source configuration will look something like:

curl -X POST -H “Content-Type: application/json” --data ’
{“name”: “mongo-source”,
“config”: {
“tasks.max”:“1”,
“connector.class”:“com.mongodb.kafka.connect.MongoSourceConnector”,
“key.converter”:“org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”:false,
“value.converter”:“org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”:false,
“publish.full.document.only”: true,
“connection.uri”:“mongodb://mongo1:27017,mongo2:27017,mongo3:27017”,
“topic.prefix”:“aprefix”,
“database”:“MyDB”,
“collection”:“SomeCollection”
}}’ http://localhost:8083/connectors -w “\n”

the sink would look something like:
curl -X POST -H “Content-Type: application/json” --data ’
{“name”: “mongo-atlas-sink”,
“config”: {
“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:“1”,
“topics”:“aprefix.MyDB.SomeCollection”,
“connection.uri”:“'<>”,
“database”:“DestinationDBName”,
“collection”:“DestinationCollection”,
“key.converter”:“org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”:false,
“value.converter”:“org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”:false
}}’ http://localhost:8083/connectors -w “\n”

1 Like

@Robert_Walters Thanks for the response but the problem with this approach is that the source connector generator event stream which contains

    {
      _id: { <BSON Object> },
      "operationType": "<operation>",
      "fullDocument": { <document> },
      "ns": {
        "db": <database>,
        "coll": <collection>
      },
      "to": {
        "db": <database>,
        "coll": <collection>
      },
      "documentKey": {
        _id: <value>
      },
      "updateDescription": {
        "updatedFields": { <document> },
        "removedFields": [ <field>, ... ]
      },
      "clusterTime": <Timestamp>,
      "txnNumber": <NumberLong>,
      "lsid": {
        "id": <UUID>,
        "uid": <BinData>
      }
    }

this format all we want is a full document. The sink connector has a change.data.capture.handler the field which supports CDC using Debezium Connector.

But connecting Debezium Connector to MongoDB Atlas is another challenge that we are facing write.

Is there a CDC handler for the MongoDB official source connector that we can use in the sink connector?

At this time you’ll have to use the Debezium Connector for MongoDB as the source. That connector supports MongoDB CDC events. The MongoDB Connector for Apache Kafka supports “sinking” to MongoDB, CDC events sourced from the Debezium Connectors for MongoDB, MySQL and Postgres.

@Robert_Walters thanks for your response again.
We tried using Debezium Connector but we are getting Error while reading the 'shards' collection in the 'config' database: Timed out after 30000 ms while waiting to connect this error while the connector is trying to connector to MongoDB Atlas instance.

This Debezium connection issue was resolved by enabling SSL on the connector config.

Update, starting in Version 1.4 of the Kafka Connector, we added a CDC handler for MongoDB so you can now apply change stream events to a sink. See details in this blog:

1 Like