Friday, June 01, 2012

Using MongoDB's New Aggregation Framework in Python (MongoDB Aggregation Part 2)

Continuing on in my series on MongoDB and Python, this article will explore the new aggregation framework introduced in MongoDB 2.1. If you're just getting started with MongoDB, you might want to read the previous articles in the series first:

And now that you're all caught up, let's jump right in....

Why a new framework?

If you've been following along with this article series, you've been introduced to MongoDB's mapreduce command, which up until MongoDB 2.1 has been the go-to aggregation tool for MongoDB. (There's also the group() command, but it's really no more than a less-capable and un-shardable version of mapreduce(), so we'll ignore it here.) So if you already have mapreduce() in your toolbox, why would you ever want something else?

Mapreduce is hard; let's go shopping

The first motivation behind the new framework is that, while mapreduce() is a flexible and powerful abstraction for aggregation, it's really overkill in many situations, as it requires you to re-frame your problem into a form that's amenable to calculation using mapreduce(). For instance, when I want to calculate the mean value of a property in a series of documents, trying to break that down into appropriate map, reduce, and finalize steps imposes some extra cognitive overhead that we'd like to avoid. So the new aggregation framework is (IMO) simpler.

The Javascript global interpreter lock is evil

The MapReduce algorithm, the basis of MongoDB's mapreduce() command, is a great approach to solving Embarrassingly Parallel problems. Each invocation of map, reduce, and finalize is completely independent of the others (though the map/reduce/finalize phases are order-dependent), so we should be able to dispatch these jobs to run in parallel without any problems.

Unfortunately, due to MongoDB's use of the SpiderMonkey Javascript engine, each mongod process is restricted to running a single Javascript thread at a time. So in order to get any parallelism with a MongoDB mapreduce(), you must run it on a sharded cluster, and on a cluster with N shards, you're limited to N-way parallelism.

The new aggregation framework, on the other hand, is implemented in C++ and has its own BSON-based language, so you're not hitting the Javascript interpreter at all.

Mapreduce can't actually do everything

If you read the my previous article on MongoDB aggregation, you might have noticed that the output format of a mapreduce() command is pretty restrictive, always being of the form

{ _id: ..., /* some (possibly compound) key */
  value: ... /* some (possibly compound) value */

But what if you need the results in some other format? With mapreduce, you're stuck.

Another thing that mapreduce() doesn't do well is multiple computation phases. Of course, you can always run mapreduce() multiple times, but it would be nice if there were a way to to it all in one command.

The new aggregation framework, on the other hand, has phased computation at its heart, with the pipeline the fundamental structure of aggregation.

A series of tubes

So now to the rescue comes MongoDB's new aggregation framework. To understand the new framework, the first thing you need to know is that it's all based around the aggregation pipeline. Conceptually, MongoDB sends each document in a collection into the first operator on the pipeline, which may modify it and send it along to the next operation, similar to a UNIX pipeline.

Another way you can look at the new aggregation framework is that it's a 'super-find'. The aggregate command can do anthing the find() method can do, and quite a bit more. For instance, the following query:

q = db.my_collection.find({...some query...}, {...some fields...})
q = q.sort(...some sort...)
q = q.skip(...some skip...)
q = q.limit(...some limit...)

would be implemented by the following aggregate pipeline:

pipeline = [
    { '$match': ...some query... },
    { '$sort': ...some sort...},
    { '$skip': ...some skip... },
    { '$limit': ...some limit... }
    { '$project': ...some fields...},
q = db.command('aggregate', 'my_collection', pipeline=pipeline )

When you're using aggregate, one thing to keep in mind is that order matters. Because of this, you want to place the most selective parts of your pipeline first, in order to reduce the number of documents successive pipeline stages have to process.

Above, the pipeline (conceptually) does the following:

  • For every document in 'my_collection', pass along only those documents that $match the given query
  • $sort all the documents given up to this point by the sort key, and then send the sorted documents along
  • $skip over the given number of documents, not sending them along to the successive stages of the pipeline, and then send the rest down the pipe.
  • Only send the given $limit number of documents to the next stages of the pipeline
  • $project the documents into a new format, sending along each re-shaped document

At the end, we have something that duplicates the output of the query above. Now, if we had decided to $sort, then $match, the server could have quite a few more documents to $sort.

Actually aggregating

Although it's nice that MongoDB has grown a 'super-find' operation, our aggregation pipelines can do much more. Suppose we want to compute an average of a number of x values, grouped by y values. First, we'll insert some random data for testing (I'm running my MongoDB server on port 27018, rather than the default 27017):

>>> import pymongo
>>> import random
>>> conn = pymongo.Connection(port=27018)
>>> db = conn.agg
...     dict(x=random.random(), y=random.randint(0,10))
...     for i in range(50)])

Basic grouping and document shaping

Now, to compute the average, we can use the $group pipeline operator:

>>> pipeline = [
...     {'$group': {
...         '_id': '$y',
...         'mean': {'$avg': '$x'} } } ]
>>> db.command('aggregate', 'data', pipeline=pipeline)
{u'ok': 1.0, u'result': [
    {u'_id': 8, u'mean': 0.45158769756086975}, 
    {u'_id': 7, u'mean': 0.10767920380630946}, 
    {u'_id': 9, u'mean': 0.537524071579183}, 
    {u'_id': 5, u'mean': 0.4712010849527597}, 
    {u'_id': 10, u'mean': 0.6450144108731366}, 
    {u'_id': 3, u'mean': 0.5829812441520267}, 
    {u'_id': 2, u'mean': 0.5139874597597852}, 
    {u'_id': 1, u'mean': 0.35474322219101523}, 
    {u'_id': 4, u'mean': 0.6813855492820068}, 
    {u'_id': 0, u'mean': 0.47123430216880813}]}

The first thing to note above is the use of $x and $y. This is the aggregation framework's syntax for referring to fields in a document. This is necessary because we can do some simple operations on documents as well. For instance, suppose we wanted to double the x values. For this, we would use the $project operator:

>>> pipeline = [
...     { '$project': {
...         '_id': 0,
...         'x': p1,
...         'y': 1,
...         'double_x': {
...             '$multiply': [ '$x', 2 ] 
...         } } } ]
>>> db.command('aggregate', 'data', pipeline=pipeline)
{u'ok': 1.0, u'result': [
    {u'y': 8, u'x': 0.0731577856277077, 
     u'double_x': 0.1463155712554154}, 
    {u'y': 8, u'x': 0.6628554147686313, 
     u'double_x': 1.3257108295372626}, 
    ... ] }

Unwinding arrays

Another nice thing we can do is to treat elements of an embedded array in a document as documents themselves. Suppose we had a number of comments embedded in a blog post, for instance:

>>> [ 
...     { 'title': 'First Post',
...       'comments': [
...            {'author': 'Someone'},
...            {'author': 'Someone Else'},
...            {'author': 'Someone'},
...            {'author': 'Someone'} ] },
...     { 'title': 'Second Post',
...       'comments': [
...            {'author': 'Another commenter'},
...            {'author': 'Someone Else'},
...            {'author': 'Someone'} ] } ] )
[ObjectId(...), ObjectId(...)]

Now, suppose we want to find out how many times each commenter has posted. To do this, we'll start by using the $unwind operator:

>>> pipeline = [ {'$unwind': '$comments'} ]
>>> db.command('aggregate', 'blog.articles', pipeline=pipeline)
{u'ok': 1.0, u'result': [
    {u'_id': ObjectId(...), 
     u'comments': {u'author': u'Someone'}, 
     u'title': u'First Post'},
    {u'_id': ObjectId('...'), 
     u'comments': {u'author': u'Someone Else'}, 
     u'First Post'}, ... ] }

Note here how the documents coming out of $unwind have the same structure as the input, except for the unwound array, which has been replaced by a single element. Once we have the unwound documents, we can add some other operators to calculate the number of posts per commenter:

>>> pipeline = [ 
...     {'$unwind': '$comments'},
...     {'$group': {
...         '_id': '$',
...         'comments': { '$sum': 1 } } } ]
>>> db.command('aggregate', 'blog.articles', pipeline=pipeline)
{u'ok': 1.0, u'result': [
    {u'_id': u'Another commenter', u'comments': 1}, 
    {u'_id': u'Someone Else', u'comments': 2}, 
    {u'_id': u'Someone', u'comments': 4}]}

One thing to note is that we can use $group as a sort of reverse-$unwind by using the agggregation function $push. Returning to our synthetic (x,y) example, we can "wind up" all the x values for a given y:

>>> pipeline = [
...     {'$group': {
...         '_id': '$y',
...         'xs': {'$push': '$x'} } },
...     {'$project': {
...         '_id': 0,
...         'y': '$_id',
...         'xs': 1 } },
...     {'$sort': 'y'} ]
>>> db.command('aggregate', 'data', pipeline=pipeline)
{u'ok': 1.0, u'result': [
    {u'y': 0, u'xs': [
        0.5480646819579362, 0.06989250292970206, 
        0.1483275969049508, 0.37886136252339875, 
        0.944175551482085, 0.6528854869305418, 
        0.6699875222192363, 0.038172503680066416, 
        0.29335286756814327, 0.8896985443297418, 
        0.5501587033310872] }, 
    ... ] }

Here, we first $grouped to compute the aggregate we are interested in, then applied a $project to rename the fields a bit, finishing up with a $sort to present the results in sorted order.

The new aggregation can't do everything (yet)

So now that you've seen the amazing new aggregation framework, you might be ready to drop everything else and use it. Unfortunately, there are a few things you need to be aware of that can bite you.

There is no $out operator

Sadly, the planned $out operator didn't make it into the 2.1 release of the aggregation framework. What this means is that you can't define a pipeline and have it put its results into another collection; you need to be ready to process the results when the command returns. Because of this, aggregation results are limited to 16MB due to the document size limit in MongoDB. So if you need to compute millions of groups, you're going to be stuck one of the following options:

  • Breaking up your pipeline with $match or $skip/$limit so each pipeline's results are under 16MB
  • Falling back to mapreduce
  • Puling the data out of MongoDB and doing the aggregations in some external system (this is the approach used in Zarkov, for instance). all the computation to happen on the server.

There is no explain() (yet)

If you're interested in how your queries scale in MongoDB (and you should be), you've probably become quite accustomed to the explain() command. Unfortunately, there is no corresponding command for the new aggregation framework unless you're using the nightly builds.. This means you have to be careful when constructing your pipelines. The rules of thumb I use are

  • Put your $match operator first. This operator can use an index (yay!), and you can actually test that by using the same query with find().explain().
  • If possible, put your $sort immediately after the $match. This makes the first two stages behave just like a find().sort(), which again is explain()able.
  • Be careful with the stateful operators. Most pipeline operators are stateless, so their memory usage does not change with the length of the pipeline. But $group and $sort are not, since they accumulate results in RAM. So be careful where you put them.

If you are up to using the nightly builds, or want to wait till the explain feature makes it into an official release, the always-awesome [Chris Westin][chris-westin] points out that you can use explain by passing it as an additional parameter to your aggregation command:

>>> db.command('aggregate', 'data', pipeline=pipeline, explain=True)

There are a number of other features in the new aggregation framework that I haven't covered here. For more details, visit the official docs. I'd be interested in hearing what you think about the new aggregation framework, particularly if anyone is using it in production (or pre-production). Let me know in the comments below!


  1. explain is now in the development builds. Add explain:true to your aggregation command.

  2. Thanks for the info, Chris. I've updated the post to reflect that information.

    And thanks again for the framework!

  3. Anonymous8:41 AM

    Suppose I want to compute the log N in the pipeline, the operators I see are $add, $multiply, etc. Where can I find and functions supported by aggregation. Not all computation involves those simple operators. May be a power function. How would I do that in aggregation.

    1. Thanks for the comment! Currently, only simple arithmetic expressions are supported by the aggregation framework ( From what I understand, it's straightforward to add new operators, so if there are particular things you're interested in, I'd recommend creating a JIRA ticket at