- Getting Started
- Quick Tour Primer
Quick Tour Primer
The following code snippets come from the SubscriberHelpers.java example code
that can be found with the examples source.
Note
See the installation guide for instructions on how to install the MongoDB RxJava Driver.
Reactive Extensions
This library provides support for ReactiveX (Reactive Extensions) by using the
RxJava library. All database calls return an Observable which an Observer can subscribe to.
That Observer reacts to whatever item or sequence of items the Observable emits. This pattern facilitates concurrent operations
because an application thread is not blocked while waiting for the Observable to emit objects, but instead it creates a sentry
in the form of an Observer that stands ready to react appropriately at whatever future time the Observable does so.
For more information about Reactive Extensions and Observables go to: http://reactivex.io.
From Async Callbacks to Observables
The MongoDB RxJava Driver is built upon the callback-driven MongoDB Async driver.
The API mirrors the Async driver API and any methods that cause network I/O return either a Observable<T> or a MongoObservable<T>
where T is the type of response for the operation.
The exception to that rule is for methods in the async driver that return a Void value in the callback.
As an Observable<Void> is generally considered bad practise, in these circumstances we
return an Observable<Success> for the operation.
MongoObservable
In RxJava Observable is not an interface, so where the MongoDB Async Driver API follows a fluent interface pattern we return a
MongoObservable<T>. The MongoObservable<T> mirrors the underlying fluent API and provides two extra methods:
toObservable()Returns an
Observable<T>instance for the operation.subscribe(Subscriber<? super TResult> subscriber)Creates an
Observable<T>, subscribes to it, and returns theSubscription
Note
All Observables returned
from the API are cold, meaning that no I/O happens until they are subscribed to. As such an observer is guaranteed to see the whole
sequence from the beginning. So just creating an Observable won’t cause any network IO, and it’s not until Subscriber.request() is called
that the driver executes the operation.
Publishers in this implementation are unicast. Each Subscription
to an Observable relates to a single MongoDB operation and its ‘Subscriber’
will receive its own specific set of results.
Subscribers used in the Quick Tour
For the Quick Tour we use RxJava’s TestSubscriberSubscriberHelpers.java provides
two static helpers:
printSubscriber()Prints each value emitted by the
Observable, along with an optional initial message.printDocumentSubscriber()Prints the JSON representation of each
Documentemitted by theObservable.
Blocking and non blocking examples
As the TestSubscriber contains a latch that is only released when the onCompleted method of the Subscriber is called,
we can use that latch to block on by calling the subscriber.awaitTerminalEvent() method. Below are two examples using our auto-requesting PrintDocumentSubscriber.
The first is non-blocking and the second blocks waiting for the Publisher to complete:
// Create a publisher
Observable<Document> observable = collection.find().toObservable();
// Non blocking
observable.subscribe(printDocumentSubscriber());
Subscriber<Document> subscriber = printDocumentSubscriber();
observable.subscribe(subscriber);
// Block for the publisher to complete
subscriber.awaitTerminalEvent();