How to Leverage Changestreams and Apache Spark/PySpark For a Real-Time Machine Learning Pipeline

I am trying to figure out a way to utilize/implement both Apache Spark/PySpark with the changestream feature of MongoDB.

My old solution was to basically load the 50th most recent documents inserted into my collection, using a pipeline pipeline = "{'$sort':{'_id' : -1}}" and df = spark.read.format("mongo").option("pipeline", pipeline).load().limit(50), into a data frame and then iterate over said dataframe and run analysis. This Collection that is populating the data frame is a Kafka cosumer so its always getting new data, hence only receiving the X most recently inserted documents and then running some kind of analysis on the dataframe rows.

With changestreams I would like to utilize the watch on insert action, so whenever the Collection gets inserted with new documents, run the PySpark Jupyter Labs notebook and populate the dataframe with those newly inserted records, run analysis and then wait till new documents are inserted into the Collection. The Collection is getting new documents about every 30ish seconds, and I can change the time peroid in my consumer.py to give the Jupyter Notebook more time to run its code.

Is there an efficent way to merge the two or should I just use the changestream functionality by itself and forgoe Apache Spark?

It sounds like it might be easier to just use the change stream functionality directly using PyMongo driver. change_stream – Watch changes on a collection, database, or cluster — PyMongo 4.3.3 documentation. Alternatively, since you are using Kafka, you could set up MongoDB as a source (this uses change streams under the covers), get data into a kafka topic then process it via Spark that way.