Hello,
I have been using ReactiveMongoTemplate ChangeStreams in certain parts of my code, to listen to specific changes of documents in the database.
From the front-end I open up an EventSource to open a stream to the code as shown below. Whenever I close the stream on the front-end, it seams that the ChangeStream is still open. This causes for my application to use an abundant amount of connections, especially with all the different ChangeStreams for the various documents that I have.
Is there a possibility to close a ChangeStream or somehow get more control of it? I have searched allover and am unable to find an answer to this.
public Flux<Run> watchSingleRun(String userId, String runId) {
ChangeStreamOptions changeStreamOptions = ChangeStreamOptions.builder()
.filter(Aggregation.newAggregation(SCMIntegration.class, Aggregation.match(
new Criteria().orOperator(
Criteria.where("operationType").is(OperationType.UPDATE.getValue()),
Criteria.where("operationType").is(OperationType.REPLACE.getValue()),
Criteria.where("operationType").is(OperationType.INSERT.getValue())))))
.returnFullDocumentOnUpdate().build();
Mono<User> userMono = userRepository.findById(userId);
Mono<Team> teamMono = userMono.flatMap(user -> teamRepository.findUserInTeams(user.get_id()));
return reactiveMongoTemplate.changeStream("runs", changeStreamOptions, Run.class).zipWith(teamMono)
.filter(objects -> objects.getT1().getBody().get_id().equals(runId))
.flatMap(objects -> {
ChangeStreamEvent<Run> runChangeStreamEvent = objects.getT1();
Team team = objects.getT2();
if(runChangeStreamEvent.getBody().getTeamId().equals(team.get_id())) {
return Flux.just(runChangeStreamEvent.getBody());
} else {
return Mono.error(new DataNotFoundException("Data not found or it does not belong to you."));
}
}).repeat();
}