Change stream hanging while fetching update events

So i was working on ChangeStreams test case in the mongo mflix , incase if i donot append fullDocument(FullDocument.UPDATE_LOOKUP) to db.watch(oddFilter) then the test case hangs

 @Test
  public void testWatchAllUpdates() {
    insertAndUpdateThread.start();
    System.out.println("Running test testWatchAllUpdates");
    List<Bson> oddFilter =
        singletonList(
            Aggregates.match(
                Filters.and(
                    Document.parse("{'fullDocument.even': 0}"),
                    Filters.eq("operationType", "update"))));
    List<Document> allUpdates = new ArrayList<>();
    for (ChangeStreamDocument<Document> d :
        comments.watch(oddFilter).fullDocument(FullDocument.UPDATE_LOOKUP)) {
      Document updatedDoc = d.getFullDocument();
      if (updatedDoc.containsKey("stop")) {
        break;
      }

      allUpdates.add(updatedDoc);
    }
    Assert.assertEquals(5, allUpdates.size());
  }

By looking at this code, the only way to exit the for loop is when an update with the full document contains the key stop. My guess is that having the full document is part of the requirement. So it is not really hanging.

But it should have thrown Null pointer exception right.if FullDocument.UPDATE_LOOKUP is not provided. As the mongodb doc says , For update operations, this field only appears if you configured the change stream with fullDocument set to updateLookup .

May be that condition makes sure you only process update with full documents. So I suspect that your stream actually process nothing since you will never get any document but none will have a full document on update. You might add some trace as the first statement inside the for loop to see if you process anything.

Tried the above comments, the new code snippet looks something like this. It didnot print any of the statement inside or after the for loop

  @Test
  public void testWatchAllUpdates() {
    insertAndUpdateThread.start();
    // This got printed 
    System.out.println("Running test testWatchAllUpdates");
    List<Bson> oddFilter =
        singletonList(
            Aggregates.match(
                Filters.and(
                    Document.parse("{'fullDocument.even': 0}"),
                    Filters.eq("operationType", "update"))));
    List<Document> allUpdates = new ArrayList<>();
    for (ChangeStreamDocument<Document> d :
        comments.watch(oddFilter)) {
    	// Did not print this statement
      System.out.println(" Inside for loop ");
      Document updatedDoc = d.getFullDocument();
      // Did not print this statement
      System.out.println(" Checking the doc  for stop");
      if (updatedDoc.containsKey("stop")) {
        break;
      }
     // Did not print this statement
      System.out.println(" Adding the doc ");
      allUpdates.add(updatedDoc);
    }
    // Didnot print this statement as well
    System.out.println(" Asserting the docs");
    Assert.assertEquals(5, allUpdates.size());
  }

May be you do not update any document from the collection comments that has a field named even set to 0. This is part of oddFilter.

What kind of update do you perform?

I m running the mflix ChangeStream testcase, it basically inserts documents updates document in insertAndUpdateThread. Links to the mflix code
https://s3.amazonaws.com/edu-downloads.10gen.com/M220J/2021/April/static/handouts/m220/mflix-java.zip
Adding the insert and update logic code snippet as well

  private void insertAndUpdate() {
    sleepyTime();
    int i = 0;
    while (i < 10) {
      Document doc = new Document();
      doc.append("i", i++);
      doc.append("even", i % 2);
      System.out.println(" Inserting  ----------");
      collection.insertOne(doc);
    }
    int j = 0;
    while (j < 10) {
      Bson queryFilter = new Document("i", j);
      System.out.println(" Updating  ----------");
      collection.updateOne(queryFilter, set("i", j * 10));
      j++;
    }
    Document stopDoc = new Document("stop", false);
    stopDoc.append("even", 1);
    collection.insertOne(stopDoc);
    collection.updateOne(stopDoc, set("even", 0));
    collection.updateOne(stopDoc, set("stop", true));
  }

In testWatchAllUpdates() you refer to the collection with the variable comments. In insertAndUpdate() it is named collection. If they do not refer to the same physical collection you do not know if the code should work or not.

Hey in both places comments collection is referred, the test works if

ChangeStreamDocument<Document> d :
        comments.watch(oddFilter)  // is changed to

ChangeStreamDocument<Document> d :
        comments.watch(oddFilter).fullDocument(FullDocument.UPDATE_LOOKUP)

as expected I understood why the first query wont work , the reason is, in the test we are querying

Filters.and(
                    Document.parse("{'fullDocument.even': 0}"),
                    Filters.eq("operationType", "update"))

ChangeStream for update event wont bring fullDocument unless fullDocument : updateLookup is specified The hang state is actually change stream expected behavior, the change stream cursor is kept open unless

* The cursor is explicitly closed.
* An invalidate event occurs.
* If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.