Using Change Streams
MongoSwift 0.2.0 added support for change streams, which allow applications to access real-time data changes. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.
Note: Change streams only work with MongoDB replica sets and sharded clusters.
Examples
Open a Change Stream on a MongoCollection<Document> (MongoDB 3.6+)
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory")
inventory.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
    stream.forEach { event in
        // process `ChangeStreamEvent<BSONDocument>` here
    }
}.whenFailure { error in
    // handle error
}
// perform some operations using `inventory`...
Open a Change Stream on a MongoCollection<MyCodableType> (MongoDB 3.6+)
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory", withType: MyCodableType.self)
inventory.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<MyCodableType>>`
    stream.forEach { event in
        // process `ChangeStreamEvent<MyCodableType>` here
    }
}.whenFailure { error in
    // handle error
}
// perform some operations using `inventory`...
Use a Custom Codable Type for the fullDocument Property of Returned ChangeStreamEvents
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory")
inventory.watch(withFullDocumentType: MyCodableType.self).flatMap { stream in // a `ChangeStream<ChangeStreamEvent<MyCodableType>>`
    stream.forEach { event in
        // process `ChangeStreamEvent<MyCodableType>` here
    }
}.whenFailure { error in
    // handle error
}
// perform some operations using `inventory`...
Use a Custom Codable Type for the Return type of ChangeStream.next()
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory")
inventory.watch(withEventType: MyCodableType.self).flatMap { stream in // a `ChangeStream<MyCodableType>`
    stream.forEach { event in
        // process `MyCodableType` here
    }
}.whenFailure { error in
    // handle error
}
// perform some operations using `inventory`...
Open a Change Stream on a MongoDatabase (MongoDB 4.0+)
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let db = client.db("example")
db.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
    stream.forEach { event in
        // process `ChangeStreamEvent<BSONDocument>` here
    }
}.whenFailure { error in
    // handle error
}
// perform some operations using `db`...
Note: the types of the fullDocument property, as well as the return type of ChangeStream.next(), may be customized in the same fashion as the examples using MongoCollection above.
Open a Change Stream on a MongoClient (MongoDB 4.0+)
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
client.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
    stream.forEach { event in
        // process `ChangeStreamEvent<BSONDocument>` here
    }
}.whenFailure { error in
    // handle error
}
// perform some operations using `client`...
Note: the types of the fullDocument property, as well as the return type of ChangeStream.next(), may be customized in the same fashion as the examples using MongoCollection above.
Resume a Change Stream
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory")
inventory.watch().flatMap { stream -> EventLoopFuture<ChangeStream<ChangeStreamEvent<BSONDocument>>> in
    // read the first change event
    stream.next().flatMap { _ in
        // simulate an error by killing the stream
        stream.kill()
    }.flatMap { _ in
        // create a new change stream that starts after the first change event
        let resumeToken = stream.resumeToken
        return inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
    }
}.flatMap { resumedStream in
    resumedStream.forEach { event in
        // process `ChangeStreamEvent<BSONDocument>` here
    }
}.whenFailure { error in
    // handle error
}
// perform some operations using `inventory`...
Modify Change Stream Output
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory")
// Only include events where the changed document's username = "alice"
let pipeline: [BSONDocument] = [
    ["$match": ["fullDocument.username": "alice"]]
]
inventory.watch(pipeline).flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
    stream.forEach { event in
        // process `ChangeStreamEvent<BSONDocument>` here
    }
}.whenFailure { error in
    // handle error
}
// perform some operations using `inventory`...
 View on GitHub
            View on GitHub
           Install in Dash
            Install in Dash
           Change-Streams  Reference
      Change-Streams  Reference