How unwatch, close or cancel reactive change stream in spring

Hello to everyone,

I’m working with spring-boot-starte-data-mongodb-reactive and I’ve implemented a reactive change stream to watch a collection, but I don’t know how to close the stream when the client is disconnected or when I cancel the stream, this is my service:

 public Flux<Product> findAndWatchAllById(String id) {
        return reactiveMongoTemplate.changeStream(Product.class)
                .watchCollection("products")
                .filter(where("operationType").is("insert").and("id").is(id))
                .listen()
                .map(ChangeStreamEvent::getBody);
  }

In my controller I call the service:

 @GetMapping(value = "/{cartId}/products")
    public Flux<XSellingProductDTO> getProducts(@PathVariable String id) {
        return service.findAndWatchAllById(id)
                .doOnCancel(() -> logger.info("cancelling consuming..."))
                .take(Duration.ofSeconds(10));
    }

I’m using take() operator to cancel the subscription if the client still connected for more than ten seconds, but the change stream doesn’t close, thoughts?

Thanks!

2 Likes