Hello to everyone,
I’m working with spring-boot-starte-data-mongodb-reactive
and I’ve implemented a reactive change stream to watch a collection, but I don’t know how to close the stream when the client is disconnected or when I cancel the stream, this is my service:
public Flux<Product> findAndWatchAllById(String id) {
return reactiveMongoTemplate.changeStream(Product.class)
.watchCollection("products")
.filter(where("operationType").is("insert").and("id").is(id))
.listen()
.map(ChangeStreamEvent::getBody);
}
In my controller I call the service:
@GetMapping(value = "/{cartId}/products")
public Flux<XSellingProductDTO> getProducts(@PathVariable String id) {
return service.findAndWatchAllById(id)
.doOnCancel(() -> logger.info("cancelling consuming..."))
.take(Duration.ofSeconds(10));
}
I’m using take() operator to cancel the subscription if the client still connected for more than ten seconds, but the change stream doesn’t close, thoughts?
Thanks!