- Reactive Streams
- Quick Start Primer
Quick Start Primer
The aim of this guide is to provide background about the MongoDB Reactive Streams driver and its asynchronous API before going onto looking at how to use the driver and MongoDB.
Note
See the installation guide for instructions on how to install the MongoDB Reactive Streams Driver.
Reactive Streams
This library is an implementation of the reactive streams specification and the reactive stream API consists of the following components:
- Publisher
- Subscriber
- Subscription
A Publisher
is a provider of a potentially unbounded number of sequenced elements, published according to the demand received from it’s Subscriber(s)
.
In response to a call to Publisher.subscribe(Subscriber)
the possible invocation sequences for methods on the Subscriber
are given by the following protocol:
onSubscribe onNext* (onError | onComplete)?
This means that onSubscribe
is always signalled, followed by a possibly unbounded number of onNext
signals (as requested by Subscriber
)
followed by an onError
signal if there is a failure, or an onComplete
signal when no more elements are available—all as long as
the Subscription
is not cancelled.
For more information about reactive streams go to: http://www.reactive-streams.org.
Subscribers
The MongoDB Reactive Streams Driver API mirrors the Sync driver API and any methods that cause network IO return a Publisher<T>
,
where T
is the type of response for the operation.
Note
All Publishers
returned
from the API are cold, meaning that no I/O happens until they are subscribed to and the subscription makes a request. So just creating a
Publisher
won’t cause any network IO. It’s not until Subscription.request()
is called that the driver executes the operation.
Publishers in this implementation are unicast. Each Subscription
to a Publisher
relates to a single MongoDB operation and its ‘Subscriber’
will receive its own specific set of results.
Subscribers used in the Quick Start
For the Quick Start we have implemented a couple of different Subscribers and although this is an artificial scenario for reactive streams we do block on the results of one example before starting the next, so as to ensure the state of the database.
ObservableSubscriber
The base subscriber class is the
ObservableSubscriber<T>
, a Subscriber that stores the results of thePublisher<T>
. It also contains anawait()
method so we can block for results to ensure the state of the database before going on to the next example.OperationSubscriber
An implementation of the
ObservableSubscriber
that immediately callsSubscription.request
when it’s subscribed to.PrintSubscriber
An implementation of the
OperationSubscriber
that prints a messageSubscriber.onComplete
.ConsumerSubscriber
An implementation of the
OperationSubscriber
that takes aConsumer
and callsConsumer.accept(result)
whenSubscriber.onNext(T result)
is called.PrintToStringSubscriber
An implementation of the
ConsumerSubscriber
that prints the string version of theresult
onSubsbcriber.onNext()
.PrintDocumentSubscriber
An implementation of the
ConsumerSubscriber
that prints the json version of aDocument
onSubsbcriber.onNext()
.
Blocking and non blocking examples
As our subscribers contain a latch that is only released when the onComplete
method of the Subscriber
is called, we can use that latch
to block on by calling the await
method. Below are two examples using our auto-requesting PrintDocumentSubscriber
.
The first is non-blocking and the second blocks waiting for the Publisher
to complete:
// Create a publisher
Publisher<Document> publisher = collection.find();
// Non blocking
publisher.subscribe(new PrintDocumentSubscriber());
Subscriber<Document> subscriber = new PrintDocumentSubscriber();
publisher.subscribe(subscriber);
subscriber.await(); // Block for the publisher to complete
Publishers, Subscribers and Subscriptions
In general Publishers
, Subscribers
and Subscriptions
are a low level API and it’s expected that users and libraries will build more
expressive APIs upon them rather than solely use these interfaces. As a library solely implementing these interfaces, users will benefit
from this growing ecosystem, which is a core design principle of reactive streams.