Quick Tour Primer

The aim of this guide is to provide background about the Scala driver and its asynchronous API before going onto looking at how to use the driver and MongoDB.

Note

See the installation guide for instructions on how to install the MongoDB RxJava Driver.

Reactive Extensions

This library provides support for ReactiveX (Reactive Extensions) by using the RxJava library. All database calls return an Observable which an Observer can subscribe to. That Observer reacts to whatever item or sequence of items the Observable emits. This pattern facilitates concurrent operations because it does not need to block while waiting for the Observable to emit objects, but instead it creates a sentry in the form of an Observer that stands ready to react appropriately at whatever future time the Observable does so.

For more information about Reactive Extensions and Observables go to: http://reactivex.io.

From Async Callbacks to Observables

The MongoDB RxJava Driver is built upon the MongoDB Async driver which is callback driven. The API mirrors the Async driver API and any methods that cause network IO return either a Observable<T> or a MongoObservable<T> where T is the type of response for the operation.
The exception to that rule is for methods in the async driver that return a Void value in the callback. As an Observable<Void> is generally considered bad practise, in these circumstances we return a Observable<Success> for the operation.

MongoObservable

In RxJava Observable is not an interface, so where the MongoDB Async Driver API follows a fluent interface pattern we return a MongoObservable<T>. The MongoObservable<T> mirrors the underlying fluent API and provides two extra methods:

  1. toObservable()

    Returns an Observable<T> instance for the operation.

  2. subscribe(Subscriber<? super TResult> subscriber)

    Subscribes to the Observable.

Note

All Observables returned from the API are cold, meaning that no I/O happens until they are subscribed to. As such an observer is guaranteed to see the whole sequence from the beginning. So just creating an Observable won’t cause any network IO, and it’s not until Subscriber.request() is called that the driver executes the operation.

Publishers in this implementation are unicast. Each Subscription to an Observable relates to a single MongoDB operation and its ‘Subscriber’
will receive its own specific set of results.

Subscribers used in the Quick Tour

For the Quick Tour we use RxJava’s TestSubscriber and although this is an artificial scenario for reactive extensions we generally block on the results of one example before starting the next, so as to ensure the state of the database. SubscriberHelpers.java provides two static helpers:

  1. printSubscriber

    Prints each value emitted by the Observable, along with an optional initial message.

  2. printDocumentSubscriber

    Prints the json version of each Document emitted but the Observable.

Blocking and non blocking examples

As the TestSubscriber contains a latch that is only released when the onCompleted method of the Subscriber is called, we can use that latch to block on by calling the subscriber.awaitTerminalEvent() method. Below are two examples using our auto-requesting PrintDocumentSubscriber.
The first is non-blocking and the second blocks waiting for the Publisher to complete:

// Create a publisher
Observable<Document> observable = collection.find().toObservable();

// Non blocking
observable.subscribe(printDocumentSubscriber());

Subscriber<Document> subscriber = printDocumentSubscriber();
observable.subscribe(subscriber);
subscriber.awaitTerminalEvent(); // Block for the publisher to complete