- Getting Started
- Quick Tour Primer
Quick Tour Primer
The following code snippets come from the SubscriberHelpers.java
example code
that can be found with the examples source.
Note
See the installation guide for instructions on how to install the MongoDB RxJava Driver.
Reactive Extensions
This library provides support for ReactiveX (Reactive Extensions) by using the
RxJava library. All database calls return an Observable
which an Observer
can subscribe to.
That Observer
reacts to whatever item or sequence of items the Observable
emits. This pattern facilitates concurrent operations
because an application thread is not blocked while waiting for the Observable
to emit objects, but instead it creates a sentry
in the form of an Observer
that stands ready to react appropriately at whatever future time the Observable
does so.
For more information about Reactive Extensions and Observables go to: http://reactivex.io.
From Async Callbacks to Observables
The MongoDB RxJava Driver is built upon the callback-driven MongoDB Async driver.
The API mirrors the Async driver API and any methods that cause network I/O return either a Observable<T>
or a MongoObservable<T>
where T
is the type of response for the operation.
The exception to that rule is for methods in the async driver that return a Void
value in the callback.
As an Observable<Void>
is generally considered bad practise, in these circumstances we
return an Observable<Success>
for the operation.
MongoObservable
In RxJava Observable
is not an interface, so where the MongoDB Async Driver API follows a fluent interface pattern we return a
MongoObservable<T>
. The MongoObservable<T>
mirrors the underlying fluent API and provides two extra methods:
toObservable()
Returns an
Observable<T>
instance for the operation.subscribe(Subscriber<? super TResult> subscriber)
Creates an
Observable<T>
, subscribes to it, and returns theSubscription
Note
All Observables
returned
from the API are cold, meaning that no I/O happens until they are subscribed to. As such an observer is guaranteed to see the whole
sequence from the beginning. So just creating an Observable
won’t cause any network IO, and it’s not until Subscriber.request()
is called
that the driver executes the operation.
Publishers in this implementation are unicast. Each Subscription
to an Observable
relates to a single MongoDB operation and its ‘Subscriber’
will receive its own specific set of results.
Subscribers used in the Quick Tour
For the Quick Tour we use RxJava’s TestSubscriberSubscriberHelpers.java
provides
two static helpers:
printSubscriber()
Prints each value emitted by the
Observable
, along with an optional initial message.printDocumentSubscriber()
Prints the JSON representation of each
Document
emitted by theObservable
.
Blocking and non blocking examples
As the TestSubscriber
contains a latch that is only released when the onCompleted
method of the Subscriber
is called,
we can use that latch to block on by calling the subscriber.awaitTerminalEvent()
method. Below are two examples using our auto-requesting PrintDocumentSubscriber
.
The first is non-blocking and the second blocks waiting for the Publisher
to complete:
// Create a publisher
Observable<Document> observable = collection.find().toObservable();
// Non blocking
observable.subscribe(printDocumentSubscriber());
Subscriber<Document> subscriber = printDocumentSubscriber();
observable.subscribe(subscriber);
// Block for the publisher to complete
subscriber.awaitTerminalEvent();