Using Change Streams

The driver supports change streams, which allow applications to access real-time data changes. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

Note: Change streams only work with MongoDB replica sets and sharded clusters.

Examples

These examples use the driver’s async/await APIs; for examples using EventLoopFutures please see the previous version of this guide.

Open a Change Stream on a MongoCollection (MongoDB 3.6+)

We recommend to open and interact with change streams in their own Tasks, and to terminate change streams by canceling their corresponding Tasks. In the following example, change stream events will be processed asynchronously as they arrive on changeStreamTask until the Task is canceled. ChangeStream conforms to Swift’s AsyncSequence protocol and so can be iterated over using a for-in loop.

struct Item: Codable {
    let _id: BSONObjectID
    let name: String
    let cost: Int
    let count: Int
}

let inventory = client.db("example").collection("inventory", withType: Item.self)
let changeStreamTask = Task {
    for try await event in try await inventory.watch() {
        // process  `ChangeStream<ChangeStreamEvent<Item>>`
    }
}

// later...
changeStreamTask.cancel()

If you provide a pipeline to watch which transforms the shape of the returned documents, you will need to specify a type to use for the ChangeStreamEvent.fullDocument property. You can do this as follows when calling watch:

struct ItemCount: Codable {
    let _id: BSONObjectID
    let count: Int
}

let changeStreamTask = Task {
    let pipeline: [BSONDocument] = [["$unset": ["fullDocument.name", "fullDocument.cost"]]]
    for try await event in try await inventory.watch(pipeline, withFullDocumentType: ItemCount.self) {
        // process  `ChangeStream<ChangeStreamEvent<ItemCount>>`
    }
}

// later...
changeStreamTask.cancel()

You can also provide a type to use in place of ChangeStreamEvent altogether:

let changeStreamTask = Task {
    for try await event in try await inventory.watch(withEventType: InventoryEvent.self) {
        // process  `ChangeStream<ChangeStreamEvent<InventoryEvent>>`
    }
}

// later...
changeStreamTask.cancel()

Open a Change Stream on a MongoDatabase (MongoDB 4.0+)

You can also open a change stream on an entire database, which will observe events on all collections in the database:

let db = client.db("example")

let changeStreamTask = Task {
    for try await event in try await db.watch() {
        // process  `ChangeStream<ChangeStreamEvent<BSONDocument>>`
    }
}

// later...
changeStreamTask.cancel()

Note: the type of the ChangeStreamEvent.fullDocument property, as well as the return type of ChangeStream.next(), may be customized in the same fashion as the examples using MongoCollection above by passing in fullDocumentType or eventType to watch().

Open a Change Stream on a MongoClient (MongoDB 4.0+)

You can also open a change stream on an entire cluster, which will observe events on all databases and collections:

let changeStreamTask = Task {
    for try await event in try await client.watch() {
        // process  `ChangeStream<ChangeStreamEvent<BSONDocument>>`
    }
}

// later...
changeStreamTask.cancel()

Note: the type of the ChangeStreamEvent.fullDocument property, as well as the return type of ChangeStream.next(), may be customized in the same fashion as the examples using MongoCollection above by passing in fullDocumentType or eventType to watch().

Resume a Change Stream

Change streams can be resumed from particular points in time using resume tokens. For example:

let inventory = client.db("example").collection("inventory")

let changeStreamTask1 = Task { () -> ResumeToken? in
    let changeStream = try await inventory.watch()
    // read the first change event
    _ = try await changeStream.next()
    // resume token to resume stream after the first event
    return changeStream.resumeToken
}

// Get resume token from the first task and change stream.
guard let resumeToken = try await changeStreamTask1.value else {
    fatalError("Unexpectedly missing resume token after processing event")
}

let changeStreamTask2 = Task {
    let changeStream = try await inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
    for try await event in changeStream {
        // process ChangeStreamEvent
    }
}

// later...
changeStreamTask2.cancel()

Modify Change Stream Output

let inventory = client.db("example").collection("inventory", withType: Item.self)

let changeStreamTask = Task {
    // Only include events where the changed document's count = 0
    let pipeline: [BSONDocument] = [
        ["$match": ["fullDocument.count": 0]]
    ]
    for try await event in try await inventory.watch(pipeline) {
        // process  `ChangeStream<ChangeStreamEvent<Item>>`
    }
}

// later...
changeStreamTask.cancel()

See Also