Queues

A queue lets multiple publishers push messages to it and multiple subscribers can pull messages off it. The message is only delivered to a single subscriber.

A Queue With Publishers and Subscribers

A variant of a queue is called a topic and the difference with a queue is that all subscribers receive all messages. In MongoDB the classic example is the oplog that contains all operations performed on the master database and that all secondaries listen to for changes.

We are going to look at both examples and how to implement them using MongoDB.

Work Queue

The work queue will contain messages describing work to be performed asynchronously. In our example this will be to process images that have been uploaded.

The document describing a job looks like and we are inserting it into our queue collections.

var col = db.getSisterDB("work").queue;
col.insert({
  "input": "/img/images.png",
  "output": "/converted/images.jpg"
});

The ObjectId that is added to all documents as the _id field if not overridden contains a timestamp allowing us to use it to sort by time. To fetch the next document in a FIFO (First In First Out) manner do.

var col = db.getSisterDB("work").queue;
var job = col.findAndModify({
    query: {}
  , sort: {_id: 1}
  , remove: true
});

This will sort the jobs in ascending order by _id and remove and return the first one. Since findAndModify is an atomic operation it guarantees that only a single subscriber receives the message.

Work Queue With Priorities and Timestamps

We can extend the work queue very easily to allow for priorities and statistics by extending the work document to include a couple of more fields.

var col = db.getSisterDB("work").queue;
col.insert({
  "priority": 1,
  "input": "/img/images.png",
  "output": "/converted/images.jpg",
  "start_time": null,
  "end_time": null,
});

In the previous example we are consuming the actual work item by removing it from the collection but here we want to keep it around for reporting purposes. Let’s grab the highest priority work item.

var col = db.getSisterDB("work").queue;
var job = col.findAndModify({
    query: {start_time: null}
  , sort: {priority: -1}
  , update: {$set: {start_time: new Date()}}
  , new: true
});

When we are done we can set the end time for the job using a simple update.

var col = db.getSisterDB("work").queue;
col.update({_id: job._id}, {$set: {end_time: new Date()}});

Stock Ticker Topic

The stock ticker topic allows multiple applications to listen to a live stream of data about stock price. For the topic we want to ensure maximum throughput. We can achieve this by using a special type of collection in MongoDB called a capped collection.

A capped collection is basically what we call a ring buffer meaning they have a fixed size. Once the application goes over the size it starts overwriting documents.

A Ring Buffer

The benefit of the capped collection is that it allows for tailing meaning applications can listen to new documents being inserted. Let’s set up our stock ticker schema.

{
  "time": ISODate("2014-01-01T10:01:22Z")
  "ticker": "PIP",
  "price": "4.45"
}

Create a new capped collection

var db = db.getSisterDB("finance");
db.createCollection("ticker", {capped:true, size:100000})

Let’s boot up a new shell and set up a producer of tickers that emits a random price between 0 and 100 for PIP once a second.

var db = db.getSisterDB("finance");
while(true) {
  db.ticker.insert({
    time: new Date(),
    ticker: "PIP",
    price: 100 * Math.random(1000)
  })

  sleep(1000);
}

Let’s boot up a consumer for the tickers using a tailable cursor.

var db = db.getSisterDB("finance");
var cursor = db.ticker.find({time: {$gte: new Date()}}).addOption(DBQuery.Option.tailable).addOption(DBQuery.Option.awaitData)

while(cursor.hasNext) {
  print(JSON.stringify(cursor.next(), null, 2))
}

This consumer will get any ticker prices that are equal to or newer than the current time when it starts up.

Time To Live Indexes (TTL)

MongoDB 2.4 or higher has a new type of index called TTL that lets the server expire documents gradually over time. Take the following document.

{
  "hello": "world",
  "created_on": ISODate("2014-01-01T10:01:22Z")
}

Say we only want to keep the last 24 hours worth of data. This could be accomplished by performing a bulk remove of document using a batch job, or it can be more elegantly be solved using a TTL index.

var db = db.getSisterDB("data");
var numberOfSeconds = 60 * 60 * 24; // 60 sec * 60 min * 24 hours
db.expire.ensureIndex({ "created_on": 1}, {expireAfterSeconds: numberOfSeconds })

As documents expire against the TTL index they will be removed gradually over time.

Time To Live Indexes (TTL)

The TTL index is not a hard real-time limit of expiry. It only guarantees that document will be expired some time after it hits the expire threshold but this period will vary depending on the load on MongoDB and other currently running operations.