Control Characters at the beginning of Kafka value

Hello,

I am using the mongodb source connector and producing to a kafka topic. I am consuming from this topic with a String serde and attempting to read the values of the Kafka message into a Jackson JsonNode in java.

The problem I am seeing is that all of the messages arrive with some control characters at the beginning of the string like so (which is not valid json):

“]�!{”_id": {“_data”: “samplevalue”}}"

I’m struggling to remove these characters from incoming kafka messages. Did I miss a setting in the source connector configs? Just looking for ways to parse out my data or remove these characters from my incoming messages.

I saw the thread discussing the 1.3 version of the connector which will allow us to produce directly to json format but I’m looking for something to hold me over until that releases! :slight_smile:

UPDATE- Here are my connector configs (after some sanitizing):

{
      "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
      "tasks.max": "{{TASKS}}",
      "collection": "Profile",
      "database": "data",
      "pipeline": "[]",
      "topic.prefix": "mongodb",
      "poll.await.time.ms": "5000",
      "poll.max.batch.size": "1000",
      "publish.full.document.only": "false",
      "batch.size": "0",
      "change.stream.full.document": "updateLookup",
      "copy.existing": "true",
      "connection.uri": "mongodb://{[user]}:{[pass]}@{[host]}:{[port]},{[host2]}:{[port]},{[host3]}:{[port]}"
}

Thanks in advance!
Nick

what converter are you using at the Source? can you provide your source configuration settings?

Hey Robert!

I have updated my post to include my connector configs. The closest thing I have seen to a converter configuration in the docs here was ‘collation’, and wasn’t entirely sure if I needed to use that in my use case.

I think you should try telling the connector what to serialize the data onto the kafka topic with. This is done through the key.converter and value.converter parameters.

 "key.converter":"org.apache.kafka.connect.json.JsonConverter",
 "key.converter.schemas.enable":false,
 "value.converter":"org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable":false,

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.