How unwatch, close or cancel reactive change stream in spring

Hello to everyone,

I’m working with spring-boot-starte-data-mongodb-reactive and I’ve implemented a reactive change stream to watch a collection, but I don’t know how to close the stream when the client is disconnected or when I cancel the stream, this is my service:

 public Flux<Product> findAndWatchAllById(String id) {
        return reactiveMongoTemplate.changeStream(Product.class)

In my controller I call the service:

 @GetMapping(value = "/{cartId}/products")
    public Flux<XSellingProductDTO> getProducts(@PathVariable String id) {
        return service.findAndWatchAllById(id)
                .doOnCancel(() ->"cancelling consuming..."))

I’m using take() operator to cancel the subscription if the client still connected for more than ten seconds, but the change stream doesn’t close, thoughts?