How to convert BSON Timestamp to ISODate or Python datetime w/o loss of microseconds?

Hi,

How to convert BSON Timestamp to ISODate or Python datetime w/o loss of microseconds?
I found that my code below doesn’t keep the microseconds part and so when used in query for changes, it always treats the same change with the clusterTime as a new one:
src_last_change_date = datetime.fromtimestamp(src_last_change_time.time, tz=timezone.utc)

Thank you!

BSON Timestamp has second-level precision, not microsecond precision (or even millisecond): BSON (Binary JSON): Specification

Can you elaborate more on what you are trying to accomplish?

Hi Shane,

I am working on a service which will use change stream to sync changes in US region to China region which is disconnected comparing to other regions.

For error cases, we use event bridge to retry, and we want to retry only those collections with pending changes w/o waiting till timeout for those w/o changes at all. And so we are trying to use the saved last synced change record to check any new changes in the collection after the clusterTime since not all change content may have timestamp of the record like deletion or updates w/o last_modified field.

If clusterTime doesn’t have milliseconds, then we may have to use some workaround like comparing the document id etc. However, it will be collection specific and not as good as having the millisecond info in the change event directly.

I found other posts asking for similar issue too.

Thank you.

Mary

Thanks for the explanation.

and we want to retry only those collections with pending changes w/o waiting till timeout for those w/o changes at all

The server timeout when no changes are available defaults to 1 second but is configurable via the max_await_time_ms argument to Collection.watch():

# Watch changes to all database and all collections 
with coll.watch(max_await_time_ms=1) as stream:
    change = stream.try_next()  
    if change:
        # process...
    else:
        # Server timed out the getMore after 1ms because no changes occurred. 
        pass

max_await_time_ms can even be set to 0 to timeout immediately. Although I would warn that with a small timeout you should ensure you application does not flood the server with getMores when the stream is idle. One way to avoid that is to add an application-side sleep (or other work) in between next() or try_next() calls. An example can be found here.

Another option could be to use a cluster-level or database-level change stream which can watch all the collections in a single cursor. That way the app can retry using start_after or start_at_operation_time:

# Watch changes to all database and all collections 
with client.watch(start_at_operation_time=src_last_change_time) as stream:
    ...
# Watch changes to all collections in the "test" database
with client.test.watch(start_at_operation_time=src_last_change_time) as stream:
    ...

Is this helpful?

Hi Shane,

Thank you very much for your help.
The issue we want to address is that sometimes change stream watch() times out even when there are changes (not sure env related issues including AWS). For such case, we want to treat it as an error and will enable the timer by AWS Event Bridge to trigger the Lambda again (sorry, it’s not service as I mentioned above) since some changes like license have less delay tolerance. To distinguish between normal timeout w/o changes and error case for retry, we try to check any new changes since previous synced record.

Yeah, we are watching on multiple collections by configuration already, another reason that we don’t want to wait on like first 3 collections w/o changes and only 4th collection with change as an example.
However, if the timeout of 1sec is enough to get any new change if exists, the extra wait may not be a big deal. However, we still need find out the timeout is caused by no new changes or not to decide on retry by timer from AWS Event bridge.

We don’t use timestamp for the watch since token will be more reliable(?) which was chosen by a coworker who worked on the first version. We are enforcing in order no loss sync.

However, I just added function using timestamp(oldest timestamp from all records) to watch for the init case when no record has been synced up yet and no resume token to use. Once first record synced using timestamp, we switch over to use resume token.

Hope it help you understand our needs of finding any new changes since previous sync now. However, if I missed other options, please share.

Thank you!

Mary

Some code or pseudocode would help us understand your requirements and expectations here.

Here is the kind of pseudocode:

AWS Lambda handler with DocumentDB change stream and Event bridge as triggers:

  1. Get list of collections to get changes
     if change stream event:
        check_collection_list = [collection_name from the event]; 
     else if timer from event bridge:
         check_collection_list = a list of all configured collections which have new changes since last sync record OR any records if no record synced yet`
    
2. Go through the collection list to get and process changes
`for collection_name in check_collection_list:
    get saved last synced change  for the collection
     if no last synced record:
         sync_first_change(collection_name, change)  using timestamp for watch()
    else: 
         get previous token id as resume token id
    
     try: 
         with current_collection.watch(resume_after=self.source_last_token_id,
                                                  max_await_time_ms=change_await_time_ms) as stream:
              change = stream.try_next()
              if change: 
                  while change:
                      if not self.process_change(collection_name, change, False):
                          break
                      change = stream.try_next()
       except pymongo.errors.PyMongoError as e:
                    if e.timeout:
                        # check whether timeout caused by no new changes
                        if self.dbsync_tracker.has_changes_to_sync(collection_name):
                            logger.error(f"Treat as error since timed out with more changes to sync")
                            self.err_handling.on_error(str(e))
                        else:
                            logger.info(f"Timed out with NO changes to sync")
                        continue
                    else:
                        logger.error(f"PyMonogo ChangeStream Unrecoverable Error: {e}")
                        self.err_handling.on_error(str(e))`

Thanks for the added context. I don’t have time to help out more, perhaps someone else can take over.

Thank you, Shane.

However, a simple question is that:
Can MongoDB add the millisecond info to the change content from the change stream? It would be the simplest solution comparing to all other workarounds.

Can you bring it up to your or related team for consideration?

Thank you very much!

Mary

Could you use the change stream wallTime field?: https://jira.mongodb.org/browse/SERVER-65022

|wallTime|ISODate|The server date and time of the database operation. wallTime differs from clusterTime in that clusterTime is a timestamp taken from the oplog entry associated with the database operation event.
New in version 6.0.

1 Like

Great, thanks!
So it’s available in MongoDB version 6.0?
What about PyMongo version which we are using?

Thanks!

Mary

Hi Shane,

Do you know which 6.0 releases have the fix?
I checked MongoDB release page but cannot find 65022 listed in the change log for releases after the fix date of the issue 10/29/23.

I am using DocumentDB and PyMongo and so need find out which versions of them have this fix already.

Thank you very much!

Mary

DocumentDB is not MongoDB nor supported by us so I can’t say. As you can see DocumentDB does not actually support many MongoDB features. See https://www.mongodb.com/supportability/documentdb

Yeah, looks like DocumentDB doesn’t support MongoDB 6.0 yet, but they still use MongoDB underneath just as a managed service? We are hosting our services in AWS and that’s why DocumentDB was chosen.

We may consider switching over if necessary features are missing.

However, which MongoDB release has the wallTime already in the change stream? 6.0.12?

Thank you!

Mary

wallTime field is present in 6.0.0

DocumentDB is not managed MongoDB, it is a different implementation with compatible interface (link

1 Like