- Scala Driver
- Tutorials
- Change Streams
Change Streams
MongoDB 3.6 introduces a new $changeStream
aggregation pipeline
operator.
Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the
MongoCollection
API includes a new watch
method. The ChangeStreamObservable
sets up the change stream and automatically attempts
to resume if it encounters a potentially recoverable error.
Prerequisites
The example below requires a
restaurants
collection in thetest
database. To create and populate the collection, follow the directions in github.Include the following import statements:
import java.util.concurrent.CountDownLatch
import org.mongodb.scala._
import org.mongodb.scala.model.Aggregates._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.changestream._
important
This guide uses the Observable
implicits as covered in the Quick Start Primer.
Connect to a MongoDB Deployment
Connect to a MongoDB deployment and declare and define a MongoDatabase
and a MongoCollection
instance.
For example, include the following code to connect to a replicaSet MongoDB deployment running on localhost on ports 27017
, 27018
and 27019
.
It also defines database
to refer to the test
database and collection
to refer to the restaurants
collection.
val mongoClient: MongoClient = MongoClient("mongodb://localhost:27017,localhost:27018,localhost:27019")
val database: MongoDatabase = mongoClient.getDatabase("test")
val collection: MongoCollection[Document] = database.getCollection("restaurants")
For additional information on connecting to MongoDB, see Connect to MongoDB.
Watch the collection
To create a change stream use one of the MongoCollection.watch()
methods.
In the following example, the change stream prints out all changes it observes.
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] {
val latch = new CountDownLatch(1)
override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument)
override def onError(throwable: Throwable): Unit = {
println(s"Error: '$throwable")
latch.countDown()
}
override def onComplete(): Unit = latch.countDown()
def await(): Unit = latch.await()
}
val observer = LatchedObserver()
collection.watch().subscribe(observer)
observer.await() // Block waiting for the latch
Watch the database
New in the 3.8 driver and MongoDB 4.0, applications can open a single change stream to watch all non-system collections of a database. To
create such a change stream use one of the MongoDatabase.watch()
methods.
In the following example, the change stream prints out all the changes it observes on the given database.
val observer = LatchedObserver()
database.watch().subscribe(observer)
observer.await() // Block waiting for the latch
Watch all databases
New in the 3.8 driver and MongoDB 4.0, applications can open a single change stream to watch all non-system collections of all databases
in a MongoDB deployment. To create such a change stream use one of the
MongoClient.watch()
methods.
In the following example, the change stream prints out all the changes it observes on the deployment to which the MongoClient
is
connected
val observer = LatchedObserver()
client.watch().subscribe(observer)
observer.await() // Block waiting for the latch
Filtering content
The watch
method can also be passed a list of aggregation stages, that can modify
the data returned by the $changeStream
operator. Note: not all aggregation operators are supported. See the
$changeStream
documentation for more information.
In the following example the change stream prints out all changes it observes, for insert
, update
, replace
and delete
operations:
First it uses a
$match
stage to filter for documents where theoperationType
is either aninsert
,update
,replace
ordelete
.Then, it sets the
fullDocument
toFullDocument.UPDATE_LOOKUP
, so that the document after the update is included in the results.
val observer = LatchedObserver()
collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete")))))
.fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer)
observer.await() // Block waiting for the latch