For the most recent version of the reference documentation, see our MongoDB Java Driver documentation site.
- MongoDB Driver
- Tutorials
- Change Streams
Change Streams - Draft
MongoDB 3.6 introduces a new $changeStream
aggregation pipeline
operator.
Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the
MongoCollection
API includes a new watch
method. The ChangeStreamIterable
sets up the change stream and automatically attempts
to resume if it encounters a potentially recoverable error.
Prerequisites
The example below requires a
restaurants
collection in thetest
database. To create and populate the collection, follow the directions in github.Include the following import statements:
import com.mongodb.Block;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoClient;
import com.mongodb.ConnectionString;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.bson.Document;
- Include the following code which the examples in the tutorials will use to print the results of the change stream:
Block<ChangeStreamDocument<Document>> printBlock = new Block<>() {
@Override
public void apply(final ChangeStreamDocument<Document> changeStreamDocument) {
System.out.println(changeStreamDocument);
}
};
Connect to a MongoDB Deployment
Connect to a MongoDB deployment and declare and define a MongoDatabase
and a MongoCollection
instance.
For example, include the following code to connect to a replicaSet MongoDB deployment running on localhost on ports 27017
, 27018
and 27019
.
It also defines database
to refer to the test
database and collection
to refer to the restaurants
collection.
MongoClient mongoClient = MongoClients.create(new ConnectionString("mongodb://localhost:27017,localhost:27018,localhost:27019"));
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("restaurants");
For additional information on connecting to MongoDB, see Connect to MongoDB.
Watch the collection
To create a change stream use the the MongoCollection.watch()
method.
In the following example, the change stream prints out all changes it observes.
collection.watch().forEach(printBlock);
Filtering content
The watch
method can also be passed a list of aggregation stages, that can modify
the data returned by the $changeStream
operator. Note: not all aggregation operators are supported. See the
$changeStream
documentation for more information.
In the following example the change stream prints out all changes it observes, for insert
, update
, replace
and delete
operations:
First it uses a
$match
stage to filter for documents where theoperationType
is either aninsert
,update
,replace
ordelete
.Then, it sets the
fullDocument
toFullDocument.UPDATE_LOOKUP
, so that the document after the update is included in the results.
collection.watch(asList(Aggregates.match(Filters.in("operationType", asList("insert", "update", "replace", "delete")))))
.fullDocument(FullDocument.UPDATE_LOOKUP).forEach(printBlock);