- MongoDB Async Driver
- Reference
- Observables
Observables
The MongoDB Async Driver is fully callback-based and makes extensive use of SingleResultCallback<T>
to achieve this. The SingleResultCallback<T>
interface requires the implementation of a single method onResult(T result, Throwable t)
which is called once the operation has completed or errored. The result
parameter contains the result of the operation if successful. If the operation failed for any reason then the t
contains the Throwable
reason for the failure. This pattern allows the users application logic to be deferred until the underlying network IO to MongoDB has been completed.
The callback pattern is extremely flexible but can quickly become unwieldy if the application logic requires a chain of operations. Nesting of callbacks can make code harder to read and give the appearance of making the codebase more complex that it actually is. To help with this we also have released two observable based asynchronous drivers:
These observable drivers follow similar patterns that split the logic into onNext
, onError
and onComplete(d)
methods. These methods split out the logic used by SingleResultCallback<T>.onResult(T result, Throwable t)
into individual components that can make the code easier to reason with.
The MongoDB Async Driver provides a factory and interfaces that do the heavy lifting of converting callback-based operations into an observable operations. There are three interfaces Observable
, Subscription
and Observer
. The Observables
helpers convert any callback-based operations into observable operations.
Note
The interfaces are similar to Publisher
, Subscription
and Subscriber
interfaces from the reactive streams JVM implementation. However, we prefer the name Observerable
to Publisher
and Observer
to Subscriber
for readability purposes.
Observable
The Observable
represents a MongoDB operation which emits its results to the Observer
based on demand requested by the Subscription
to the Observable
.
Subscription
A Subscription
represents a one-to-one lifecycle of an Observer
subscribing to an Observable
. A Subscription
to an Observable
can only be used by a single Observer
. The purpose of a Subscription
is to control demand and to allow unsubscribing from the Observable
.
Observer
An Observer
provides the mechanism for receiving push-based notifications from the Observable
. Demand for these events is signalled by its Subscription
. On subscription to an Observable
the Observer
will be passed the Subscription
via the onSubscribe(Subscription subscription)
.
Demand for results is signaled via the Subscription
and any results are passed to the onNext(TResult result)
method. If there is an error for any reason the onError(Throwable e)
will be called and no more events passed to the Observer
. Alternatively, when the Observer
has consumed all the results from the Observable
the onComplete()
method will be called.
Wrapping a MongoIterable
With the Observables
factory creating an Observable
from a MongoIterable
is simple.
In the following example we iterate and print out all json forms of documents in a collection:
Observables.observe(collection.find()).subscribe(new Observer<Document>(){
@Override
void onSubscribe(final Subscription subscription) {
System.out.println("Subscribed and requesting all documents");
subscription.request(Long.MAX_VALUE);
}
@Override
void onNext(final Document document) {
System.out.println(document.toJson());
}
@Override
void onError(final Throwable e) {
System.out.println("There was an error: " + e.getMessage());
}
@Override
void onComplete() {
System.out.println("Finished iterating all documents");
}
});
Wrapping a callback-based method
Creating an Observable
from any callback-based methods requires the wrapping of the method inside a Block
. This allows the execution of the operation to be delayed, until demand is request by the Subscription
. The method must use the supplied callback to convert the results into an Observable
.
In the following example we print out the count of the number of documents in a collection:
Block<SingleResultCallback<Long>> operation = new Block<SingleResultCallback<Long>>() {
@Override
void apply(final SingleResultCallback<Long> callback) {
collection.countDocuments(callback);
}
};
// Or in Java 8 syntax:
operation = (Block<SingleResultCallback<Long>>) collection::countDocuments;
Observables.observe(operation).subscribe(new Observer<Long>(){
@Override
void onSubscribe(final Subscription subscription) {
System.out.println("Subscribed and requesting the count");
subscription.request(1);
}
@Override
void onNext(final Long count) {
System.out.println("The collection has " + count + " documents");
}
@Override
void onError(final Throwable e) {
System.out.println("There was an error: " + e.getMessage());
}
@Override
void onComplete() {
System.out.println("Finished");
}
});
Back Pressure
In the following example, the Subscription
is used to control demand when iterating an Observable
. This is similar in concept to the MongoIterable.forEach
method but allows demand-driven iteration:
Observables.observe(collection.find()).subscribe(new Observer<Document>(){
private long batchSize = 10;
private long seen = 0;
private Subscription subscription;
@Override
void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscription.request(batchSize);
}
@Override
void onNext(final Document document) {
System.out.println(document.toJson());
seen += 1;
if (seen == batchSize) {
seen = 0;
subscription.request(batchSize);
}
}
@Override
void onError(final Throwable e) {
System.out.println("There was an error: " + e.getMessage());
}
@Override
void onComplete() {
System.out.println("Finished iterating all documents");
}
});