MongoDB.live, free & fully virtual. June 9th - 10th. Register Now MongoDB.live, free & fully virtual. June 9th - 10th. Register Now

MongoDB Kafka Source Connector throws java.lang.IllegalStateException: Queue full when using copy.existing: true

When importing data from mongodb to kafka using the connector, https://github.com/mongodb/mongo-kafka, it throws java.lang.IllegalStateException: Queue full .

I use the default setting of copy.existing.queue.size , which is 16000, and copy.existing: true . What value should I set? The collection size to import to kafka is 10G.

Environment:

mongo-kafka-connect: 1.0.0
Kafka: 2.4.0
Kafka-Connect: 2.4.0
MongoDB server: 3.6.14
mongodb-driver-sync: 3.12.1

Stacktrace:

org.apache.kafka.connect.errors.ConnectException: java.lang.IllegalStateException: Queue full
	at com.mongodb.kafka.connect.source.MongoCopyDataManager.poll(MongoCopyDataManager.java:95)
	at com.mongodb.kafka.connect.source.MongoSourceTask.getNextDocument(MongoSourceTask.java:301)
	at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:154)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.base\/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base\/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base\/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base\/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base\/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: Queue full
	at java.base\/java.util.AbstractQueue.add(Unknown Source)
	at java.base\/java.util.concurrent.ArrayBlockingQueue.add(Unknown Source)
	at com.mongodb.client.internal.Java8ForEachHelper.forEach(Java8ForEachHelper.java:30)
	at com.mongodb.client.internal.Java8AggregateIterableImpl.forEach(Java8AggregateIterableImpl.java:54)
	at com.mongodb.kafka.connect.source.MongoCopyDataManager.copyDataFrom(MongoCopyDataManager.java:123)
	at com.mongodb.kafka.connect.source.MongoCopyDataManager.lambda$new$0(MongoCopyDataManager.java:87)
	... 5 more

FYI this issue is being discussed (and addressed) at KAFKA-85

1 Like