Using Change Streams
MongoSwift 0.2.0 added support for change streams, which allow applications to access real-time data changes. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.
Note: Change streams only work with MongoDB replica sets and sharded clusters.
Examples
Open a Change Stream on a MongoCollection<Document>
(MongoDB 3.6+)
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory")
inventory.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
stream.forEach { event in
// process `ChangeStreamEvent<BSONDocument>` here
}
}.whenFailure { error in
// handle error
}
// perform some operations using `inventory`...
Open a Change Stream on a MongoCollection<MyCodableType>
(MongoDB 3.6+)
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory", withType: MyCodableType.self)
inventory.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<MyCodableType>>`
stream.forEach { event in
// process `ChangeStreamEvent<MyCodableType>` here
}
}.whenFailure { error in
// handle error
}
// perform some operations using `inventory`...
Use a Custom Codable
Type for the fullDocument
Property of Returned ChangeStreamEvent
s
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory")
inventory.watch(withFullDocumentType: MyCodableType.self).flatMap { stream in // a `ChangeStream<ChangeStreamEvent<MyCodableType>>`
stream.forEach { event in
// process `ChangeStreamEvent<MyCodableType>` here
}
}.whenFailure { error in
// handle error
}
// perform some operations using `inventory`...
Use a Custom Codable
Type for the Return type of ChangeStream.next()
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory")
inventory.watch(withEventType: MyCodableType.self).flatMap { stream in // a `ChangeStream<MyCodableType>`
stream.forEach { event in
// process `MyCodableType` here
}
}.whenFailure { error in
// handle error
}
// perform some operations using `inventory`...
Open a Change Stream on a MongoDatabase
(MongoDB 4.0+)
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let db = client.db("example")
db.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
stream.forEach { event in
// process `ChangeStreamEvent<BSONDocument>` here
}
}.whenFailure { error in
// handle error
}
// perform some operations using `db`...
Note: the types of the fullDocument
property, as well as the return type of ChangeStream.next()
, may be customized in the same fashion as the examples using MongoCollection
above.
Open a Change Stream on a MongoClient
(MongoDB 4.0+)
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
client.watch().flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
stream.forEach { event in
// process `ChangeStreamEvent<BSONDocument>` here
}
}.whenFailure { error in
// handle error
}
// perform some operations using `client`...
Note: the types of the fullDocument
property, as well as the return type of ChangeStream.next()
, may be customized in the same fashion as the examples using MongoCollection
above.
Resume a Change Stream
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory")
inventory.watch().flatMap { stream -> EventLoopFuture<ChangeStream<ChangeStreamEvent<BSONDocument>>> in
// read the first change event
stream.next().flatMap { _ in
// simulate an error by killing the stream
stream.kill()
}.flatMap { _ in
// create a new change stream that starts after the first change event
let resumeToken = stream.resumeToken
return inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
}
}.flatMap { resumedStream in
resumedStream.forEach { event in
// process `ChangeStreamEvent<BSONDocument>` here
}
}.whenFailure { error in
// handle error
}
// perform some operations using `inventory`...
Modify Change Stream Output
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
let client = try MongoClient(using: elg)
let inventory = client.db("example").collection("inventory")
// Only include events where the changed document's username = "alice"
let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]]
]
inventory.watch(pipeline).flatMap { stream in // a `ChangeStream<ChangeStreamEvent<BSONDocument>>`
stream.forEach { event in
// process `ChangeStreamEvent<BSONDocument>` here
}
}.whenFailure { error in
// handle error
}
// perform some operations using `inventory`...