- Reference
- Observables
Observables
The MongoDB Scala Driver is an asynchronous and non blocking driver. Using the Observable
model asynchronous events become simple, composable operations, freed from the complexity of nested callbacks.
For asynchronous operations there are three interfaces Observable
, Subscription
and Observer
.
Note
The interfaces are similar to Publisher
, Subscription
and Subscriber
interfaces from the reactive streams JVM implementation. However, we prefer the name Observerable
to Publisher
and Observer
to Subscriber
for readability purposes.
Observable
The Observable
represents a MongoDB operation which emits its results to the Observer
based on demand requested by the Subscription
to the Observable
.
Subscription
A Subscription
represents a one-to-one lifecycle of an Observer
subscribing to an Observable
. A Subscription
to an Observable
can only be used by a single Observer
. The purpose of a Subscription
is to control demand and to allow unsubscribing from the Observable
.
Observer
An Observer
provides the mechanism for receiving push-based notifications from the
Observable
. Demand for these events is signalled by its Subscription
.
On subscription to an Observable[TResult]
the Observer
will be passed the Subscription
via the
onSubscribe(subscription: Subscription)
. Demand for results is signaled via the Subscription
and any results are passed to the
onNext(result: TResult)
method. If there is an error for any reason the onError(e: Throwable)
will be
called and no more events passed to the Observer
. Alternatively, when the Observer
has consumed all the results from the Observable
the onComplete()
method will be called.
Back Pressure
In the following example, the Subscription
is used to control demand when iterating an Observable
. The default Observer
implementation
automatically requests all the data. Below we override the onSubscribe
method custom so we can manage the demand driven iteration of the
Observable
:
collection.find().subscribe(new Observer[Document](){
var batchSize: Long = 10
var seen: Long = 0
var subscription: Option[Subscription] = None
override def onSubscribe(subscription: Subscription): Unit = {
this.subscription = Some(subscription)
subscription.request(batchSize)
}
override def onNext(result: Document): Unit = {
println(document.toJson())
seen += 1
if (seen == batchSize) {
seen = 0
subscription.get.request(batchSize)
}
}
override def onError(e: Throwable): Unit = println(s"Error: $e")
override def onComplete(): Unit = println("Completed")
})
Observable Helpers
The org.mongodb.scala
package provides improved interaction with the
Java Observable
class via the
ScalaObservable
implicit class. The extended functionality includes simple
subscription via anonymous functions:
// Subscribe with custom onNext:
collection.find().subscribe((doc: Document) => println(doc.toJson()))
// Subscribe with custom onNext and onError
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
// Subscribe with custom onNext, onError and onComplete
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"),
() => println("Completed!"))
The ScalaObservable
implicit class also provides the following Monadic operators to make chaining and working with Observable
instances
simpler:
GenerateHtmlObservable().andThen({
case Success(html: String) => renderHtml(html)
case Failure(t) => renderHttp500
})
The full list of Monadic operators available are:
andThen
: Allows the chaining of Observables.collect
: Collects all the results into a sequence.fallbackTo
: Allows falling back to an alternativeObservable
if there is a failurefilter
: Filters results of theObservable
.flatMap
: Create a newObservable
by applying a function to each result of theObservable
.foldLeft
: Creates a new Observable that contains the single result of the applied accumulator function.foreach
: Applies a function applied to each emitted result.head
: Returns the head of theObservable
in aFuture
.map
: Creates a new Observable by applying a function to each emitted result of the Observable.recover
: Creates a newObservable
that will handle any matching throwable that thisObservable
might contain by assigning it a value of anotherObservable
.recoverWith
: Creates a new Observable that will handle any matching throwable that this Observable might contain.toFuture
: Collects theObservable
results and converts to aFuture
.transform
: Creates a newObservable
by applying the resultFunction function to each emitted result.withFilter
: Provides for-comprehensions support to Observables.zip
: Zips the values of this and thatObservable
, and creates a newObservable
holding the tuple of their results.