- Reactive Streams
- Change Streams
MongoDB 3.6 introduces a new
$changeStream aggregation pipeline
Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the
MongoCollection API includes a new
watch method. The
ChangeStreamPublisher sets up the change stream and automatically attempts
to resume if it encounters a potentially recoverable error.
The example below requires a
restaurantscollection in the
testdatabase. To create and populate the collection, follow the directions in github.
Include the following import statements:
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.changestream.ChangeStreamDocument; import org.bson.Document;
This guide uses the
Subscriber implementations as covered in the Quick Start Primer.
Connect to a MongoDB Deployment
Connect to a MongoDB deployment and declare and define a
MongoDatabase and a
For example, include the following code to connect to a replicaSet MongoDB deployment running on localhost on ports
It also defines
database to refer to the
test database and
collection to refer to the
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017,localhost:27018,localhost:27019"); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
For additional information on connecting to MongoDB, see Connect to MongoDB.
Watch the collection
To create a change stream use one of the
In the following example, the change stream prints out all changes it observes.
Watch the database
New in the 3.8 driver and MongoDB 4.0, applications can open a single change stream to watch all non-system collections of a database. To
create such a change stream use one of the
In the following example, the change stream prints out all the changes it observes on the given database.
Watch all databases
New in the 3.8 driver and MongoDB 4.0, applications can open a single change stream to watch all non-system collections of all databases
in a MongoDB deployment. To create such a change stream use one of the
In the following example, the change stream prints out all the changes it observes on the deployment to which the
watch method can also be passed a list of aggregation stages, that can modify
the data returned by the
$changeStream operator. Note: not all aggregation operators are supported. See the
$changeStream documentation for more information.
In the following example the change stream prints out all changes it observes, for
First it uses a
$matchstage to filter for documents where the
operationTypeis either an
Then, it sets the
FullDocument.UPDATE_LOOKUP, so that the document after the update is included in the results.
collection.watch(asList(Aggregates.match(Filters.in("operationType", asList("insert", "update", "replace", "delete"))))) .fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(new PrintDocumentSubscriber());