I am trying to figure out a way to utilize/implement both Apache Spark/PySpark with the changestream feature of MongoDB.
My old solution was to basically load the 50th most recent documents inserted into my collection, using a pipeline pipeline = "{'$sort':{'_id' : -1}}"
and df = spark.read.format("mongo").option("pipeline", pipeline).load().limit(50)
, into a data frame and then iterate over said dataframe and run analysis. This Collection that is populating the data frame is a Kafka cosumer so its always getting new data, hence only receiving the X most recently inserted documents and then running some kind of analysis on the dataframe rows.
With changestreams I would like to utilize the watch on insert action, so whenever the Collection gets inserted with new documents, run the PySpark Jupyter Labs notebook and populate the dataframe with those newly inserted records, run analysis and then wait till new documents are inserted into the Collection. The Collection is getting new documents about every 30ish seconds, and I can change the time peroid in my consumer.py to give the Jupyter Notebook more time to run its code.
Is there an efficent way to merge the two or should I just use the changestream functionality by itself and forgoe Apache Spark?