ChangeStreams create too many connections, how to close them? (ReactiveMongoTemplate)

Hello,

I have been using ReactiveMongoTemplate ChangeStreams in certain parts of my code, to listen to specific changes of documents in the database.

From the front-end I open up an EventSource to open a stream to the code as shown below. Whenever I close the stream on the front-end, it seams that the ChangeStream is still open. This causes for my application to use an abundant amount of connections, especially with all the different ChangeStreams for the various documents that I have.

Is there a possibility to close a ChangeStream or somehow get more control of it? I have searched allover and am unable to find an answer to this.

    public Flux<Run> watchSingleRun(String userId, String runId) {
        ChangeStreamOptions changeStreamOptions = ChangeStreamOptions.builder()
                .filter(Aggregation.newAggregation(SCMIntegration.class, Aggregation.match(
                        new Criteria().orOperator(
                                Criteria.where("operationType").is(OperationType.UPDATE.getValue()),
                                Criteria.where("operationType").is(OperationType.REPLACE.getValue()),
                                Criteria.where("operationType").is(OperationType.INSERT.getValue())))))
                .returnFullDocumentOnUpdate().build();

        Mono<User> userMono = userRepository.findById(userId);
        Mono<Team> teamMono = userMono.flatMap(user -> teamRepository.findUserInTeams(user.get_id()));

        return reactiveMongoTemplate.changeStream("runs", changeStreamOptions, Run.class).zipWith(teamMono)
                .filter(objects -> objects.getT1().getBody().get_id().equals(runId))
                .flatMap(objects -> {
                    ChangeStreamEvent<Run> runChangeStreamEvent = objects.getT1();
                    Team team = objects.getT2();

                    if(runChangeStreamEvent.getBody().getTeamId().equals(team.get_id())) {
                        return Flux.just(runChangeStreamEvent.getBody());
                    } else {
                        return Mono.error(new DataNotFoundException("Data not found or it does not belong to you."));
                    }
                }).repeat();
    }

1 Like

I am joining the question although I am node js environment.

from what I have seen from the documentation the recommendation is to set a timeout at the creation of the change stream… (I didn’t know how should I create it again without creating infinite loops when there is an issue)

it looks like it created 50 connections for one mongo client with 2 change stream watchers in it.
how many new connections have you felt being created?