Having blocking operations in Reactive application using MongoDB reactive drivers

Hi all, I am writing a REST API which is reactive, i.e using Reactive Mongo drivers.
All works fine when the concurrency level is 100, however increase the concurrency level to 200, blockhound reports blocking calls in Mongo.
I am using Mongod community on windows. Below is the error. I am using blockhound to detect blocking calls. This happens only when concurrency above 100.
Any suggestions on how to resolve this error?

-- error --
2021-01-02 09:52:17.399 ERROR 18688 --- [reactor-http-nio-2] org.mongodb.driver.operation             


: Callback onResult call produced an error.

reactor.blockhound.BlockingOperationError: Blocking call! jdk.internal.misc.Unsafe#park
	at java.base/jdk.internal.misc.Unsafe.park(Unsafe.java)
	at java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
	at java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
	at java.base/java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:409)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1347)
	at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
	at java.base/java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:714)
	at com.mongodb.internal.connection.DefaultConnectionPool.getAsync(DefaultConnectionPool.java:157)
	at com.mongodb.internal.connection.DefaultServer.getConnectionAsync(DefaultServer.java:105)
	at com.mongodb.internal.binding.AsyncClusterBinding$AsyncClusterBindingConnectionSource.getConnection(AsyncClusterBinding.java:131)
	at com.mongodb.internal.async.client.ClientSessionBinding$SessionBindingAsyncConnectionSource.getConnection(ClientSessionBinding.java:140)
	at com.mongodb.internal.operation.OperationHelper.withAsyncConnectionSource(OperationHelper.java:730)
	at com.mongodb.internal.operation.OperationHelper.access$200(OperationHelper.java:68)
	at com.mongodb.internal.operation.OperationHelper$AsyncCallableWithConnectionAndSourceCallback.onResult(OperationHelper.java:750)
	at com.mongodb.internal.operation.OperationHelper$AsyncCallableWithConnectionAndSourceCallback.onResult(OperationHelper.java:738)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
	at com.mongodb.internal.async.client.ClientSessionBinding$WrappingCallback.onResult(ClientSessionBinding.java:208)
	at com.mongodb.internal.async.client.ClientSessionBinding$WrappingCallback.onResult(ClientSessionBinding.java:196)
	at com.mongodb.internal.binding.AsyncClusterBinding$1.onResult(AsyncClusterBinding.java:105)
	at com.mongodb.internal.binding.AsyncClusterBinding$1.onResult(AsyncClusterBinding.java:99)
	at com.mongodb.internal.connection.BaseCluster$ServerSelectionRequest.onResult(BaseCluster.java:432)
	at com.mongodb.internal.connection.BaseCluster.handleServerSelectionRequest(BaseCluster.java:299)
	at com.mongodb.internal.connection.BaseCluster.selectServerAsync(BaseCluster.java:155)
	at com.mongodb.internal.connection.SingleServerCluster.selectServerAsync(SingleServerCluster.java:42)
	at com.mongodb.internal.binding.AsyncClusterBinding.getAsyncClusterBindingConnectionSource(AsyncClusterBinding.java:99)
	at com.mongodb.internal.binding.AsyncClusterBinding.getReadConnectionSource(AsyncClusterBinding.java:84)
	at com.mongodb.internal.async.client.ClientSessionBinding.getReadConnectionSource(ClientSessionBinding.java:58)
	at com.mongodb.internal.operation.OperationHelper.withAsyncReadConnection(OperationHelper.java:677)
	at com.mongodb.internal.operation.FindOperation.executeAsync(FindOperation.java:689)
	at com.mongodb.internal.async.client.OperationExecutorImpl$1$1.onResult(OperationExecutorImpl.java:86)
	at com.mongodb.internal.async.client.OperationExecutorImpl$1$1.onResult(OperationExecutorImpl.java:74)
	at com.mongodb.internal.async.client.OperationExecutorImpl.getReadWriteBinding(OperationExecutorImpl.java:177)
	at com.mongodb.internal.async.client.OperationExecutorImpl.access$200(OperationExecutorImpl.java:43)
	at com.mongodb.internal.async.client.OperationExecutorImpl$1.onResult(OperationExecutorImpl.java:72)
	at com.mongodb.internal.async.client.OperationExecutorImpl$1.onResult(OperationExecutorImpl.java:66)
	at com.mongodb.internal.async.client.ClientSessionHelper.createClientSession(ClientSessionHelper.java:60)
	at com.mongodb.internal.async.client.ClientSessionHelper.withClientSession(ClientSessionHelper.java:51)
	at com.mongodb.internal.async.client.OperationExecutorImpl.execute(OperationExecutorImpl.java:66)
	at com.mongodb.internal.async.client.AsyncMongoIterableImpl.batchCursor(AsyncMongoIterableImpl.java:167)
	at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.requestInitialData(MongoIterableSubscription.java:45)
	at com.mongodb.reactivestreams.client.internal.AbstractSubscription.tryRequestInitialData(AbstractSubscription.java:177)
	at com.mongodb.reactivestreams.client.internal.AbstractSubscription.request(AbstractSubscription.java:100)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:235)
	at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.<init>(MongoIterableSubscription.java:39)
	at com.mongodb.reactivestreams.client.internal.Publishers.lambda$publish$0(Publishers.java:43)
	at com.mongodb.reactivestreams.client.internal.FindPublisherImpl.subscribe(FindPublisherImpl.java:175)
	at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:66)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
	at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:61)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4046)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
	at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259)
	at reactor.core.publisher.Operators.error(Operators.java:196)
	at reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
	at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
	at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199)
	at reactor.core.publisher.MonoFlatMapMany.subscribeOrReturn(MonoFlatMapMany.java:49)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8133)
	at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:93)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:305)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
	at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251)
	at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
	at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:99)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
	at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:383)
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:396)
	at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:540)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:252)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)

This is happening at 100 concurrent tasks because the default connection pool size in the driver is 100. So once you hit that number your task now has to wait on a resource to become available: a connection to the database on which to send the operation.

This waiting is done in largely done in a non-blocking manner. The blocking that is being reported is just submitting a task to an ExecutorService of unbounded size, so it will never actually block significantly. If you look at the stack trace, the reported blocking is when offering an item to an unbounded blocking queue that the executor service uses to keep track of the submitted tasks.

So I think this is a false-positive report from blockhound. However, while the blocking should not be a problem, you still may have throughput limitations. The connection pool is a finite resource that all the reactive tasks have to share, so if you have more tasks than connections, then tasks will have to wait for a connection, even if that waiting does not block any threads. To increase concurrency, you can increase the connection pool max size, but there are limits to how effective that will be, and you may just move the concurrency problem from the client application to the database.

Regards,
Jeff

1 Like

Thank you Jeff for your detailed response. As you said, it may be a false positive, so I shall submit this issue to Blockhound team for their response as well.
So if I had to whitelist a mongo method, request your suggestion as to which class and method to whitelist.

I have posted this question on stackoverflow as well. Hope it is in accordance with the policy.

From the stack trace I imagine you would add com.mongodb.internal.connection.DefaultConnectionPool#getAsync(SingleResultCallback<InternalConnection>) to the allow-list, though be warned that as this is not part of the public API of the driver it is subject to change in future releases without warning.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.