MongoDB.live, free & fully virtual. June 9th - 10th. Register Now

MongoDB Kafka Sink connector - AvroConverter problem

Hi,

I am trying to configure Kafka Sink connector with avro schema value.
This is my configuration :

        {
"name": "mongodb_user_management",
 "config": {
     "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
     "tasks.max":"1",
     "key.converter":"org.apache.kafka.connect.storage.StringConverter",
     "key.converter.schema.enable":"false",
     "value.converter":"io.confluent.connect.avro.AvroConverter",
     "value.converter.schema.registry.url":"http://127.0.0.1:8081",
     "connection.uri":"mongodb+srv://arkcase:password@cluster0-elrzn.mongodb.net/test?retryWrites=true&w=majority",
     "database":"UserDB",
     "collection":"users",
     "topics":"newuser",
     "database.history.kafka.bootstrap.servers":"http://127.0.0.1:9092"
     }
 }

As you can see I am using StringConverter for key and AvroConverter for Value.
I am sending avro serialized message whit this avro schema :

    {
      "name": "User",
      "namespace": "org.arkcase.avro",
      "type": "record"
      "fields": [
        {
          "name": "userId",
          "type": "string"
        }
        {
          "name": "name",
          "type": "string"
        },
        {
          "name": "lastName",
          "type": "string"
        },
    	{
          "name": "audit",
          "type": {
            "fields": [
              {
                "name": "userId",
                "type": "string"
              },
              {
                "default": "127.0.0.1",
                "name": "ipAddress",
                "type": "string"
              },
              {
                "name": "requestId",
                "type": "string"
              }
            ],
            "name": "Audit",
            "type": "record"
          }
        }
      ]
    }

I am loading the connector and everything is okey until the first message is sent.
After that connector is down with the following exception :

Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'user': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"user-001"; line: 1, column: 6]

Looks like avro converter is not working.

Any suggestions ?

Solved.
Looks like mongo sink connector had problem with the message key.
I was sending string key, but even if the key converter is String it requires at least valid json encoded string.
Solution :
transforms=WrapKey
transforms.WrapKey.type=org.apache.kafka.connect.transforms.HoistField$Key
transforms.WrapKey.field=_id

1 Like