- Reactive Streams
- Quick Start Primer
Quick Start Primer
The aim of this guide is to provide background about the MongoDB Reactive Streams driver and its asynchronous API before going onto looking at how to use the driver and MongoDB.
Note
See the installation guide for instructions on how to install the MongoDB Reactive Streams Driver.
Reactive Streams
This library is an implementation of the reactive streams specification and the reactive stream API consists of the following components:
- Publisher
- Subscriber
- Subscription
A Publisher
is a provider of a potentially unbounded number of sequenced elements, published according to the demand received from it’s Subscriber(s)
.
In response to a call to Publisher.subscribe(Subscriber)
the possible invocation sequences for methods on the Subscriber
are given by the following protocol:
onSubscribe onNext* (onError | onComplete)?
This means that onSubscribe
is always signalled, followed by a possibly unbounded number of onNext
signals (as requested by Subscriber
)
followed by an onError
signal if there is a failure, or an onComplete
signal when no more elements are available—all as long as
the Subscription
is not cancelled.
For more information about reactive streams go to: http://www.reactive-streams.org.
Subscribers
The MongoDB Reactive Streams Driver API mirrors the Sync driver API and any methods that cause network IO return a Publisher<T>
,
where T
is the type of response for the operation.
Note
All Publishers
returned
from the API are cold, meaning that nothing happens until
they are subscribed to. So just creating a Publisher
won’t cause any network IO. It’s not until Publisher.subscribe
is called that
the driver executes the operation.
Publishers in this implementation are unicast. Each
Subscription
to a Publisher
relates to a single MongoDB operation and its
‘Subscriber’ will receive its own
specific set of results.
Subscribers used in the Quick Start
For the Quick Start we have implemented a couple of different Subscribers and although this is an artificial scenario for reactive streams we do block on the results of one example before starting the next, so as to ensure the state of the database.
ObservableSubscriber
The base subscriber class is the
ObservableSubscriber<T>
, a Subscriber that stores the results of thePublisher<T>
. It also contains anawait()
method so we can block for results to ensure the state of the database before going on to the next example.OperationSubscriber
An implementation of the
ObservableSubscriber
that immediately callsSubscription.request
when it’s subscribed to.PrintSubscriber
An implementation of the
OperationSubscriber
that prints a messageSubscriber.onComplete
.ConsumerSubscriber
An implementation of the
OperationSubscriber
that takes aConsumer
and callsConsumer.accept(result)
whenSubscriber.onNext(T result)
is called.PrintToStringSubscriber
An implementation of the
ConsumerSubscriber
that prints the string version of theresult
onSubscriber.onNext()
.PrintDocumentSubscriber
An implementation of the
ConsumerSubscriber
that prints the json version of aDocument
onSubscriber.onNext()
.
Blocking and non blocking examples
As our subscribers contain a latch that is only released when the onComplete
method of the Subscriber
is called, we can use that latch
to block on by calling the await
method. Below are two examples using our auto-requesting PrintDocumentSubscriber
.
The first is non-blocking and the second blocks waiting for the Publisher
to complete:
// Create a publisher
Publisher<Document> publisher = collection.find();
// Non blocking
publisher.subscribe(new PrintDocumentSubscriber());
Subscriber<Document> subscriber = new PrintDocumentSubscriber();
publisher.subscribe(subscriber);
subscriber.await(); // Block for the publisher to complete
Publishers, Subscribers and Subscriptions
In general Publishers
, Subscribers
and Subscriptions
are a low level API and it’s expected that users and libraries will build more
expressive APIs upon them rather than solely use these interfaces. As a library solely implementing these interfaces, users will benefit
from this growing ecosystem, which is a core design principle of reactive streams.