Hi there,
I updated my Mongo Source Connector to the new 1.3 version and I’m currently facing an issue with the output.schema.key
configuration.
Here’s my connector.properties
file:
# Connect worker
group.id=ksql-connect-cluster
bootstrap.servers=confluentserver:9092
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="password";
# Embedded producer for source connectors
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="password";
producer.security.protocol=SASL_SSL
# Embedded consumer for sink connectors
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="password";
#
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.enhanced.avro.schema.support=true
# Schema Registry credentials
key.converter.schema.registry.url=https://schemaregistry.confluent.cloud
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.schema.registry.basic.auth.user.info=username/password
value.converter.schema.registry.url=https://schemaregistry.confluent.cloud
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=username/password
# Connector topic configurations
config.storage.topic=ksql-connect-configs
offset.storage.topic=ksql-connect-offsets
status.storage.topic=ksql-connect-statuses
offset.storage.partitions=1
status.storage.partitions=1
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
plugin.path=/lib/plugins
And here’s an example of my connector configuration:
create source connector `staging-example` with (
"connector.class" = 'com.mongodb.kafka.connect.MongoSourceConnector',
"connection.uri" = 'mongodb://mongo0:30000,mongo1:30001,mongo2:30002/?replicaSet=rs0',
"database" = 'staging',
"collection" = 'example',
"topic.prefix" = 'sk',
"change.stream.full.document" = 'updateLookup',
"copy.existing" = 'true',
"output.format.key" = 'schema',
"output.schema.key" = '{"name":"idType","type":"record","fields":[{"name": "fullDocument","type": "record","fields":[{"name":"_id","type": "string"}]}]}',
"output.format.value" = 'schema',
"output.schema.value"= '{"name":"ChangeStream","type":"record","fields":[{"name":"_id","type":"string"},{"name":"operationType","type":["string","null"]},{"name":"fullDocument","type":{"name":"fullDocument_record","type":"record","fields":[{"name":"status","doc":"","type":["null","string"],"default":null},{"name":"color","doc":"","type":["null","string"],"default":null},{"name":"_id","doc":"","type":["null","string"],"default":null}]}}]}'
);
I want my output.schema.key
to have the same value as my fullDocument._id field, and I based this configuration in the example displayed in this article: MongoDB Connector for Apache Kafka 1.3 Available Now | MongoDB Blog
However, my data is not being streamed with the content I want. Here’s an example of what my key is being set as:
By following the article mentioned above, more specifically, the ’ Write Data to Specific Partitions’ section, the key should be receiving the value of the field I configured on output.schema.key
.
Can you help me figuring out if I’m doing something wrong or if I’m missing any configuration?
Thanks in advance,
Miguel Azevedo