MongoDB.live, free & fully virtual. Register Now MongoDB.live, free & fully virtual. Register Now

Kafka Source Connector (v1.3) - 'output.schema.key' configuration

Hi there,

I updated my Mongo Source Connector to the new 1.3 version and I’m currently facing an issue with the output.schema.key configuration.

Here’s my connector.properties file:

# Connect worker
group.id=ksql-connect-cluster
bootstrap.servers=confluentserver:9092
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="username" \
    password="password";
# Embedded producer for source connectors
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="username" \
    password="password";
producer.security.protocol=SASL_SSL
# Embedded consumer for sink connectors
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="username" \
  password="password";
#
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.enhanced.avro.schema.support=true
# Schema Registry credentials
key.converter.schema.registry.url=https://schemaregistry.confluent.cloud
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.schema.registry.basic.auth.user.info=username/password
value.converter.schema.registry.url=https://schemaregistry.confluent.cloud
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=username/password

# Connector topic configurations
config.storage.topic=ksql-connect-configs
offset.storage.topic=ksql-connect-offsets
status.storage.topic=ksql-connect-statuses
offset.storage.partitions=1
status.storage.partitions=1
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
plugin.path=/lib/plugins

And here’s an example of my connector configuration:

create source connector `staging-example` with (
    "connector.class" = 'com.mongodb.kafka.connect.MongoSourceConnector',
    "connection.uri" = 'mongodb://mongo0:30000,mongo1:30001,mongo2:30002/?replicaSet=rs0',
    "database" = 'staging',
    "collection" = 'example',
    "topic.prefix" = 'sk',
    "change.stream.full.document" = 'updateLookup',
    "copy.existing" = 'true',
    "output.format.key" = 'schema',
    "output.schema.key" = '{"name":"idType","type":"record","fields":[{"name": "fullDocument","type": "record","fields":[{"name":"_id","type": "string"}]}]}',
    "output.format.value" = 'schema',
    "output.schema.value"= '{"name":"ChangeStream","type":"record","fields":[{"name":"_id","type":"string"},{"name":"operationType","type":["string","null"]},{"name":"fullDocument","type":{"name":"fullDocument_record","type":"record","fields":[{"name":"status","doc":"","type":["null","string"],"default":null},{"name":"color","doc":"","type":["null","string"],"default":null},{"name":"_id","doc":"","type":["null","string"],"default":null}]}}]}'
);

I want my output.schema.key to have the same value as my fullDocument._id field, and I based this configuration in the example displayed in this article: https://www.mongodb.com/blog/post/mongo-db-connector-for-apache-kafka-1-3-available-now

However, my data is not being streamed with the content I want. Here’s an example of what my key is being set as:

image

By following the article mentioned above, more specifically, the ’ Write Data to Specific Partitions’ section, the key should be receiving the value of the field I configured on output.schema.key.

Can you help me figuring out if I’m doing something wrong or if I’m missing any configuration?

Thanks in advance,
Miguel Azevedo

instead of “change.stream.full.document” = ‘updateLookup’ use
“publish.full.document.only”: true,

I think the change stream definition should be this:

output.schema.key : { “fields”: [ {“name”: “fullDocument”,
“type”: {“name”: “fullDocument”,“type”: “record”,
“fields”: [ { “name”:"_id",“type”: “string”} ]
}
}] }