Kafka Source Connector, Kafka Schema Registry and KSQL - Schemas inconsistencies

Hi,

I’m currently integrating MongoDB’s Kafka Source Connector with a Confluent Kafka cluster. My source connector sends the change events stream data from my database into Kafka, however I would like to know how I could integrate this connector with Schema Registry.

My setup is using Kafka from a Confluent server, then I have a docker container with KSQL and Kafka Connect embedded. This Kafka Connect currently only has the MongoDB Source Connector.

This is my connector.properties file to configure my Kafka Connect:

# Generic Connector Configs
group.id=ksql-connect-cluster
bootstrap.servers="https://confluent:broker:uri"
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="USERNAME" password="Password;
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="USERNAME" password="Password";
producer.security.protocol=SASL_SSL
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
value.converter.enhanced.avro.schema.support=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
# Schema Registry credentials
value.converter.schema.registry.url=https://schemaregistryurl
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=USERNAME:PASSWORD

This is how I set up MongoDB Source Connector properties:

image

I configured the converters to use the AvroConverter and also gave the credentials for the Schema Registry, however, when I check the Kafka’s topic to which the events are sent, instead of the schema of the change event streams data, Confluent Schema Registry shows me the following schema:

We want to use KSQL to apply transformations to the messages running through this topic that receives the change events streams, however, when I try to create a stream listening to one of these topics I receive the following error message:

The schema of the full document sent in these change events streams is extremely complex with many levels of nested objects and arrays, so having to set these schemas in AVRO manually would be very hard and error prone so we wanted to use KSQL schema inference to create these streams. This is currently not being possible due to the error displayed above which leads me to believe the problem may be in how we’re setting up our connector and consequently how we’re creating our topics and their respective AVRO schemas.

Our goal here would be to have an AVRO schema compatible with our change stream events. Is this possible to achieve automatically through the MongoDB Source connector or will I have to create the schemas manually so I can use KSQL schema inference?

In the current version of the connector the output of the source is always a string so you won’t be able to use the schema registry. However, in the next version of the connector we will support outputting to schema. There is a snapshot build of the connector with this support and demo here. GitHub - RWaltersMA/kafka1.3: The Financial Securities demo shows data flowing from MySQL, MongoDB via Kafka Connect into Kafka Topics. Showcases various improvements in MongoDB Connector for Apache Kafka V1.3. It is not feature complete but should provide some guidance as far as the direction of the connector.

Also, have you considered setting publish.full.document.only=true, this will not push all the change stream metadata to the message only the document itself.

Hi Mongo Team,

Thank you so much for the 1.3 release. The new features are exactly what we needed to move forward with the integration between our database and our Kafka infrastructure.

If I may give some feedback:

  • The configuration option output.schema.value is very important, but its usability can be frustrating. In our case, we work with complex schemas and having to write them down as a string is very error prone and difficult to debug for these same errors that may happen.
    My suggestion would be to allow the specification of a schema by giving the path to a schema file. The supported types I would also suggest would be at least .json and .avsc files.

Again, thank you for this very important release.
Miguel Azevedo

1 Like