- Getting Started
- Quick Tour Primer
Quick Tour Primer
The aim of this guide is to provide background about the Scala driver and its asynchronous API before going onto looking at how to use the driver and MongoDB.
Note
See the installation guide for instructions on how to install the MongoDB RxJava Driver.
Reactive Extensions
This library provides support for ReactiveX (Reactive Extensions) by using the
RxJava library. All database calls return an Observable
which an Observer
can subscribe to.
That Observer
reacts to whatever item or sequence of items the Observable
emits. This pattern facilitates concurrent operations
because it does not need to block while waiting for the Observable
to emit objects, but instead it creates a sentry in the form of
an Observer
that stands ready to react appropriately at whatever future time the Observable
does so.
For more information about Reactive Extensions and Observables go to: http://reactivex.io.
From Async Callbacks to Observables
The MongoDB RxJava Driver is built upon the MongoDB Async driver which is callback driven.
The API mirrors the Async driver API and any methods that cause network IO return either a Observable<T>
or a MongoObservable<T>
where T
is the type of response for the operation.
The exception to that rule is for methods in the async driver that return a Void
value in the callback.
As an Observable<Void>
is generally considered bad practise, in these circumstances we
return a Observable<Success>
for the operation.
MongoObservable
In RxJava Observable
is not an interface, so where the MongoDB Async Driver API follows a fluent interface pattern we return a
MongoObservable<T>
. The MongoObservable<T>
mirrors the underlying fluent API and provides two extra methods:
toObservable()
Returns an
Observable<T>
instance for the operation.subscribe(Subscriber<? super TResult> subscriber)
Subscribes to the
Observable
.
Note
All Observables
returned
from the API are cold, meaning that no I/O happens until they are subscribed to. As such an observer is guaranteed to see the whole
sequence from the beginning. So just creating an Observable
won’t cause any network IO, and it’s not until Subscriber.request()
is called
that the driver executes the operation.
Publishers in this implementation are unicast. Each Subscription
to an Observable
relates to a single MongoDB operation and its ‘Subscriber’
will receive its own specific set of results.
Subscribers used in the Quick Tour
For the Quick Tour we use RxJava’s TestSubscriberSubscriberHelpers.java
provides
two static helpers:
printSubscriber
Prints each value emitted by the
Observable
, along with an optional initial message.printDocumentSubscriber
Prints the json version of each
Document
emitted but theObservable
.
Blocking and non blocking examples
As the TestSubscriber
contains a latch that is only released when the onCompleted
method of the Subscriber
is called,
we can use that latch to block on by calling the subscriber.awaitTerminalEvent()
method. Below are two examples using our auto-requesting PrintDocumentSubscriber
.
The first is non-blocking and the second blocks waiting for the Publisher
to complete:
// Create a publisher
Observable<Document> observable = collection.find().toObservable();
// Non blocking
observable.subscribe(printDocumentSubscriber());
Subscriber<Document> subscriber = printDocumentSubscriber();
observable.subscribe(subscriber);
subscriber.awaitTerminalEvent(); // Block for the publisher to complete