Friday, April 05, 2013

MongoDB Pub/Sub with Capped Collections

If you've been following this blog for any length of time, you know that my NoSQL database of choice is MongoDB. One thing that MongoDB isn't known for, however, is building a publish / subscribe system. Redis, on the other hand, is known for having a high-bandwith, low-latency pub/sub protocol. One thing I've always wondered is whether I can build a similar system atop MongoDB's capped collections, and if so, what the performance would be. Read on to find out how it turned out...

Capped Collections

If you've not heard of capped collections before, they're a nice little feature of MongoDB that lets you have a high-performance circular queue. Capped collections have the following nice features:

  • They "remember" the insertion order of their documents
  • They store inserted documents in the insertion order on disk
  • They remove the oldest documents in the collection automatically as new documents are inserted

However, you give up some things with capped collections:

  • They have a fixed maximum size
  • You cannot shard a capped collection
  • Any updates to documents in a capped collection must not cause a document to grow. (i.e. not all $set operations will work, and no $push or $pushAll will)
  • You may not explicitly .remove() documents from a capped collection

To create a capped collection, just issue the following command (all the examples below are in Python, but you can use any driver you want including the Javascript mongo shell):

db.create_collection(
    'capped_collection',
    capped=True,
    size=size_in_bytes,     # required
    max=max_number_of_docs, # optional
    autoIndexId=False)      # optional

In the example above, I've created a collection that takes up size_in_bytes bytes on disk, will contain no more than max_number_of_docs, and which does not create an index on the _id field as would normally happen. Above, I mentioned that the capped collection remembers the insertion order of its documents. If you issue a find() with no sort specified, or with a sort of ('$natural', 1), then MongoDB will sort your result in insertion order. (($natural, -1) will likewise sort the result in reverse insertion order.) Since insertion order is the same as the on-disk ordering, these queries are extremely fast. To see this, let's create two collections, one capped and one uncapped, and fill both with small documents:

size = 100000

# Create the collection
db.create_collection(
    'capped_collection', 
    capped=True, 
    size=2**20, 
    autoIndexId=False)
db.create_collection(
    'uncapped_collection', 
    autoIndexId=False)

# Insert small documents into both
for x in range(size):
    db.capped_collection.insert({'x':x}, manipulate=False)
    db.uncapped_collection.insert({'x':x}, manipulate=False)

# Go ahead and index the 'x' field in the uncapped collection
db.uncapped_collection.ensure_index('x')

Now we can see the performance gains by executing find() on each. For this, I'll use the IPython, IPyMongo, and the magic %timeit function:

In [72] (test): %timeit x=list(db.capped_collection.find())
1000 loops, best of 3: 708 us per loop
In [73] (test): %timeit x=list(db.uncapped_collection.find().sort('x'))
1000 loops, best of 3: 912 us per loop

So we get a moderate speedup, which is nice, but not spectacular. What becomes really interesting with capped collections is that they support tailable cursors.

Tailable cursors

If you're querying a capped collection in insertion order, you can pass a special flag to find() that says that it should "follow the tail" of the collection if new documents are inserted rather than returning the result of the query on the collection at the time the query was initiated. This behavior is similar to the behavior of the Unix tail -f command, hence its name. To see this behavior, let's query our capped collection with a 'regular' cursor as well as a 'tailable' cursor. First, the 'regular' cursor:

In [76] (test): cur = db.capped_collection.find()

In [77] (test): cur.next()
       Out[77]: {u'x': 0}

In [78] (test): cur.next()
       Out[78]: {u'x': 1}

In [79] (test): db.capped_collection.insert({'y': 1})
       Out[79]: ObjectId('515f205cfb72f0385c3c2414')

In [80] (test): list(cur)
       Out[80]:
[{u'x': 2},
 ...
 {u'x': 99}]

Notice above that the document we inserted {'y': 1} is not included in the result since it was inserted after we started iterating. Now, let's try a tailable cursor:

In [81] (test): cur = db.capped_collection.find(tailable=True)

In [82] (test): cur.next()
       Out[82]: {u'x': 1}

In [83] (test): cur.next()
       Out[83]: {u'x': 2}

In [84] (test): db.capped_collection.insert({'y': 2})
       Out[84]: ObjectId('515f20ddfb72f0385c3c2415')

In [85] (test): list(cur)
       Out[85]:
[{u'x': 3},
 ...
 {u'x': 99},
 {u'_id': ObjectId('515f205cfb72f0385c3c2414'), u'y': 1},
 {u'_id': ObjectId('515f20ddfb72f0385c3c2415'), u'y': 2}]

Now we see that both the "y" document we created before as well as the one created during this cursor's iteration included in the result.

Waiting on data

While tailable cursors are nice for picking up the inserts that happened while we were iterating over the cursor, one thing that a true pub/sub system needs is low latency. Polling the collection to see if messages have been inserted is a non-starter from a latency standpoint because you have to do one of two things:

  • Poll continuously, using prodigious server resources
  • Poll intermittently, increasing latency

Tailable cursors have another option you can use to "fix" the above problems: the await_data flag. This flag tells MongoDB to actually wait a second or two on an exhausted tailable cursor to see if more data is going to be inserted. In PyMongo, the way to set this flag is quite simple:

cur = db.capped_collection.find(
    tailable=True,
    await_data=True)

Building a pub/sub system

OK, now that we have a capped collection, with tailable cursors awaiting data, how can we make this into a pub/sub system? The basic approach is:

  • We use a single capped collection of moderate size (let's say 32kB) for all messages
  • Publishing a message consists of inserting a document into this collection with the following format: { 'k': topic, 'data': data }
  • Subscribing to the collection is a tailable query on the collection, using a regular expression to only get the messages we're interested in.

The actual query we use is similar to the following:

def get_cursor(collection, topic_re, await_data=True):
    options = { 'tailable': True }
    if await_data:
        options['await_data'] = True
    cur = collection.find(
        { 'k': topic_re },
        **options)
    cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes
    return cur

Once we have the get_cursor function, we can do something like the following to execute the query:

import re, time
while True:
    cur = get_cursor(
        db.capped_collection, 
        re.compile('^foo'), 
        await_data=True)
    for msg in cur:
        do_something(msg)
    time.sleep(0.1)

Of course, the system above has a couple of problems:

  • We have to receive every message in the collection before we get to the 'end'
  • We have to go back to the beginning if we ever exhaust the cursor (and its await_data delay)

The way we can avoid these problems is by adding a sequence number to each message.

Sequences

"But wait," I imagine you to say, "MongoDB doesn't have an autoincrement field like MySQL! How can we generate sequences?" The answer lies in the find_and_modify() command, coupled with the $inc operator in MongoDB. To construct our sequence generator, we can use a dedicated "sequence" collection that contains nothing but counters. Each time we need a new sequence number, we perform a find_and_modify() with $inc and get the new number. The code for this turns out to be very short:

class Sequence(object):

    def __init__(self, db, name='mongotools.sequence'):
        self._db = db
        self._name = name

    def cur(self, name):
        doc = self._db[self._name].find_one({'_id': name})
        if doc is None: return 0
        return doc['value']

    def next(self, sname, inc=1):
        doc = self._db[self._name].find_and_modify(
            query={'_id': sname},
            update={'$inc': { 'value': inc } },
            upsert=True,
            new=True)
        return doc['value']

Once we have the ability to generate sequences, we can now add a sequence number to our messages on publication:

def pub(collection, sequence, key, data=None):
    doc = dict(
        ts=sequence.next(collection.name),
        k=key,
        data=data)
    collection.insert(doc, manipulate=False)

Our subscribing query, unfortunately, needs to get a bit more complicated:

def get_cursor(collection, topic_re, last_id=-1, await_data=True):
    options = { 'tailable': True }
    spec = { 
        'ts': { '$gt': last_id }, # only new messages
        'k': topic_re }
    if await_data:
        options['await_data'] = True
    cur = collection.find(spec, **options)
    cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes
    return cur

And our dispatch loop likewise must keep track of the sequence number:

import re, time
last_id = -1
while True:
    cur = get_cursor(
        db.capped_collection, 
        re.compile('^foo'), 
        await_data=True)
    for msg in cur:
        last_id = msg['ts']
        do_something(msg)
    time.sleep(0.1)

We an actually improve upon this a tiny bit by finding the ts field of the last value in the collection and using it to initialize our last_id value:

last_id = -1
cur = db.capped_collection.find().sort([('$natural', -1)])
for msg in cur:
    last_id = msg['ts']
    break
...

So we've fixed the problem of processing messages multiple times, but we still have a slow scan of the whole capped collection on startup. Can we fix this? It turns out we can, but not without questionable "magic."

Now, for some questionable magic...

You may be wondering why I would use a strange name like ts to hold a sequence number. It turns out that there is poorly documented option for cursors that we can abuse to substantially speed up the initial scan of the capped collection: the oplog_replay option. As is apparent from the name of the option, it is mainly used to replay the "oplog", that magic capped collection that makes MongoDB's replication internals work so well. The oplog uses a ts field to indicate the timestamp of a particular operation, and the oplog_replay option requires the use of a ts field in the query.

Now since oplog_replay isn't really intended to be (ab)used by us mere mortals, it's not directly exposed in the PyMongo driver. However, we can manage to get to it via some trickery:

from pymongo.cursor import _QUERY_OPTIONS

def get_cursor(collection, topic_re, last_id=-1, await_data=True):
    options = { 'tailable': True }
    spec = { 
        'ts': { '$gt': last_id }, # only new messages
        'k': topic_re }
    if await_data:
        options['await_data'] = True
    cur = collection.find(spec, **options)
    cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes
    if await:
        cur = cur.add_option(_QUERY_OPTIONS['oplog_replay'])
    return cur

(Yeah, I know it's bad to import an underscore-prefixed name from another module. But it's marginally better than simply saying oplog_replay_option=8, which is the other way to make this whole thing work....)

Performance

So now we have the skeleton of a pubsub system using capped collections. If you'd like to use it yourself, all the code is available on Github in the MongoTools project. So how does it perform? Well obviously the performance depends on the particular type of message passing you're doing. In the MongoTools project, there are a couple of Python example programs latency_test_pub.py and latency_test_sub.py in the mongotools/examples/pubsub directory that allow you to do your own benchmarking. In my personal benchmarking, running everything locally with small messages, I'm able to get about 1100 messages per second with a latency of 2.5ms (with publishing options -n 1 -c 1 -s 0), or about 33,000 messages per second with a latency of 8ms (this is with -n 100 -c 1 -s 0). For pure publishing bandwidth (the subscriber can't consume this many messages per second), I seem to max out at around 75,000 messages (inserts) per second.

So what do you think? With MongoTools pubsub module is MongoDB a viable competitor to Redis as a low-latency, high-bandwidth pub/sub channel? Let me know in the comments below!

22 comments:

  1. Nice writing, I have a similar implementation and find it extremely usufull since it saves me the trouble of maintenance for one more application (REDIS) in the stack, performance is not bad either.
    The only thing I missed is hacking of "ts" field although I use 'oplog_replay' when i read from RS oplog.

    And one question ..... why not use a mongo Id object for filling 'ts' field instead of creating your own sequence ?

    ReplyDelete
    Replies
    1. Thanks for the comment, Nick!

      The reason I'm using a manually generated sequence number is that ObjectIds aren't actually sequence numbers (ObjectIds generated on 2 different clients can come in "out of order" for instance). What I need for this is a way to ensure that I never handle a message twice; an always-increasing sequence number is a fairly inexpensive way to do that.

      Thanks again for the comment!

      Delete
    2. Why not using server side generated Date or ObjectId?

      Delete
    3. Neither dates nor ObjectIDs are server-side generated in MongoDB, so there's no guarantee that events are consumed in the same order they're produced.

      Delete
    4. According to the docs http://docs.mongodb.org/manual/reference/method/db.collection.insert/#id-field
      > Most drivers create an ObjectId and insert the _id field, but the mongod will create and populate the _id if the driver or application does not.

      As for server side generated dates, you can do:

      Delete
    5. And this allowed me to evaluate an expression on the server side:
      db.eval('db.messages.insert({ts: new Date()})')

      Delete
    6. Good point on the _id field. I would *not*, however, assume that server-generated _id fields are guaranteed to be monotonically increasing.

      As an aside, I've actually abandoned this approach in favor of using the oplog directly (just create insertion or update events in some collection and the get a monotonically increasing "ts" field in the oplog automatically)

      Delete
    7. eval() could be used to generate new dates on the server, I suppose, but it suffers from having to eval() Javascript, which causes a speed penalty. It's also not guaranteed that dates are unique when they are generated (multi-client again), so you'd need a unique index to take that into account.

      Tailing the oplog fixes those issues and more, though. A big issue I discovered when doing using the approach in this blog post is that there is a gap between when the sequence number is generated and when the event is inserted, so you can have a race condition where client A generates a sequence number, client B generates a sequence number, client B inserts an event, server handles client B's event and re-queries the server, updating its internal counter, then client A inserts its event. In this case, client A's event will never be handled because the internal 'tail' of the capped collection has already proceeded beyond client A's sequence number.

      Additionally, it's much faster to just insert into a collection and count on the oplog to get updated by the replication machinery rather than perform the 2 separate operations of a) generate sequence number and b) insert into oplog.

      Thanks for the comments!

      Delete
    8. I am using this to generate the _id on the server side:
      messages.database.eval('db.messages.insert({})'.format(json.dumps(
      {'data': message, 'channel': channel}, ensure_ascii=False)))

      I've seen your code using oplog, but I found it too complex. Does it have to use a dedicated database if you are reading `local`?

      > I would *not*, however, assume that server-generated _id fields are guaranteed to be monotonically increasing.

      I would (http://docs.mongodb.org/manual/reference/object-id/), ObjectId contains "a 3-byte counter, starting with a random value", which is, I guess, incremented upon each new object id.

      Thanks for the article and your comments!

      Delete
    9. Having server side generated object ids, no counters are needed, so there is no race condition (as I see it). Also not every installation is run in master mode having an oplog.

      Delete
    10. As for server-side ObjectIds being monotonic, there are two possible issues. One is counter rollover. At some point your sequence number will contain 0xfffffe, 0xffffff, 0x000000, 0x000001..., which would lead to an order inversion if two events in the same second spanned the rollover. Secondly, I haven't read the MongoDB server code here, but it's certainly possible that a different "machine ID" or "process ID" is used for different threads within the server to eliminate locking, which could lead to consistent order inversion within a second. The same page you referenced did contain a warning not to assume monotonicity (though it didn't explicitly state whether server-side monotonicity is guaranteed).

      As for the local database, that's a database present on all MongoDB nodes that is never replicated to the rest of the set. That's why the oplog is located in the local database (replicating the oplog would cause a circular replication dependency). And while using the oplog for pub/sub does require running a replica set (in most cases you should probably be doing this in production anyway), it does not require any secondaries. For instance, in development and testing I run a single-node replica set (no secondaries) for just this purpose.

      Delete
    11. Oh, and also regarding the local database and replication: all operations on all your non-local databases are automatically appended to the oplog, so there's no dedicated databases needed for that approach. (Operations in the oplog specify both the database and collection to which they pertain.)

      Delete
  2. yaps, you are right in a multi client scenario mongoIds can possibly overlap.

    Thanks for replying and your post.

    Regards Nick Milon

    ReplyDelete
  3. Rick,

    This is a great post, thanks, but it leaves me wondering one thing: What happens in the case of the classic slow consumer? E.g. what if I can insert records into the capped collection faster than I'm able to get them to the consumers, and capped collection fills up. Won't I simply lose data in that case? Or is there a way I'm not seeing to "block" the producer temporarily?

    Thanks again for the post

    Mike

    ReplyDelete
    Replies
    1. Hi Mike,

      Thanks for the comment. In the case of the slow consumer, it is possible to lose data. Blocking the producer is certainly an approach, but in my case I was able to size the capped collection to handle the occasional slow consumer, and in general I have a large number of consumers for each producer.

      Delete
  4. The reply button under your comment doesn't work, so I reply here.
    I meant that local oplog.rs records insertions not only into messages collection, but also into other collections? Or not?

    > which would lead to an order inversion if two events in the same second spanned the rollover

    I thought of this, but this means over 16 mln of documents per second, which is unrealistic. Anyway, I think using server-generated ids suits my case, where sequence within a second is not so important (although on my server machine identifier and process id are the same for inserted records).

    ReplyDelete
    Replies
    1. For the oplog, I think you have things backwards. Inserts into messages (as well as all other collections in your database) causes new operation documents to be inserted into the local.oplog.rs collection. (This means that if you tail the oplog you'll have to filter out operations that you don't care about.)

      And order inversion doesn't require 16 mln documents per second. It only requires 2 documents in the same second that happen to have the sequence numbers 0xffffff and 0x000000, which can happen once ever 16 mln documents (which is not unrealistic over the course of weeks or months.) And of course, the first rollover will occur at a random time after the counter is initialized, since it's initialized with a random value.

      Thanks for the comments!

      Delete
  5. > This means that if you tail the oplog you'll have to filter out operations that you don't care about.

    That's what I meant. Filtering out also takes resources. And IIRC oplog is a capped collection with a limited size. So it could be filled with other records with enough intensivity.

    > It only requires 2 documents in the same second that happen to have the sequence numbers 0xffffff and 0x000000

    Rollover doesn't affect the order, because it's not within a second. Object id = current second + machine id + process id + counter.
    In order that object_id_1 > object_id_2 while object object_id_1 was generated earlier than object_id_2, the overflow must happen during one second, which is possible only when during the same second 16 millions ids were generated (while most of the inserted documents have ids generated on clients).

    ReplyDelete
    Replies
    1. With the oplog, the filtering happens on the server (you query the oplog for only the events you're interested in), so it doesn't actually consume more resources than the approach in this blog post. So you'd do something like db.oplog.rs.find({'ns': 'my-db.messages', 'ts': {'$gt': last_ts_field_I_handled}}).sort('$natural)

      For rollover, keep in mind that, while the counter may not be *thread* local, it is certainly *machine* local, and is not a shared counter with the clients. Thus we can ignore them entirely for this analysis. So consider two ObjectIds generated in the same second *on the server*. In this case, the first one is [current_time, server_machine_id, server_pid, 0xffffff], and the second one is [current_time, server_machine_id, server_pid, 0x000000]. So you get rollover, and order inversion. Now, you may have a slow event stream, which would reduce the likelihood of this, but it 50% of your events occur in the same second as the previous event, this inversion will happen, on average, every 48 mln events, with higher likelihood if you have a faster flow of events. (And the approach in this blog post can generate 1000s of events per second, so you *can* get inversions almost every every 28 mln events).

      Delete
    2. Ah yes, you are right about the rollover.

      Delete
    3. Looking at the sources (I could really read them, but; https://github.com/mongodb/libbson/blob/master/src/bson/bson-oid.c#L104):

      > The bson_oid_t generated by this function is not guaranteed to be globally unique. Only unique within this context. It is however, guaranteed to be sequential.

      Delete
    4. No, the sources are for something different. I've restarted the mongodb server and the ids have changed in the middle.

      Delete