Continuing on in my series on MongoDB and Python, this article will explore the new aggregation framework introduced in MongoDB 2.1. If you're just getting started with MongoDB, you might want to read the previous articles in the series first:
- Getting Started with MongoDB and Python
- Moving Along With PyMongo
- GridFS: The MongoDB Filesystem
- Aggregation in MongoDB (Part 1)
And now that you're all caught up, let's jump right in....
Why a new framework?
If you've been following along with this article series, you've been introduced
to MongoDB's mapreduce command, which up until MongoDB 2.1
has been the go-to aggregation tool for MongoDB. (There's also the group()
command, but it's really no more than a less-capable and un-shardable version of
mapreduce()
, so we'll ignore it here.) So if you already have mapreduce()
in
your toolbox, why would you ever want something else?
Mapreduce is hard; let's go shopping
The first motivation behind the new framework is that, while mapreduce()
is a
flexible and powerful abstraction for aggregation, it's really overkill in many
situations, as it requires you to re-frame your problem into a form that's
amenable to calculation using mapreduce()
. For instance, when I want to
calculate the mean value of a property in a series of documents, trying to break
that down into appropriate map
, reduce
, and finalize
steps imposes some
extra cognitive overhead that we'd like to avoid. So the new aggregation
framework is (IMO) simpler.
The Javascript global interpreter lock is evil
The MapReduce algorithm, the basis of MongoDB's mapreduce()
command, is a great approach to solving Embarrassingly Parallel
problems. Each invocation of map
, reduce
, and finalize
is completely
independent of the others (though the map/reduce/finalize phases are
order-dependent), so we should be able to dispatch these jobs to run
in parallel without any problems.
Unfortunately, due to MongoDB's use of the SpiderMonkey
Javascript engine, each mongod
process is restricted to running a single
Javascript thread at a time. So in order to get any parallelism with a MongoDB
mapreduce()
, you must run it on a sharded cluster, and on a cluster with N
shards, you're limited to N-way parallelism.
The new aggregation framework, on the other hand, is implemented in C++ and has its own BSON-based language, so you're not hitting the Javascript interpreter at all.
Mapreduce can't actually do everything
If you read the my previous article on MongoDB aggregation, you
might have noticed that the output format of a mapreduce()
command is pretty
restrictive, always being of the form
{ _id: ..., /* some (possibly compound) key */ value: ... /* some (possibly compound) value */ }
But what if you need the results in some other format? With mapreduce
, you're
stuck.
Another thing that mapreduce()
doesn't do well is multiple computation
phases. Of course, you can always run mapreduce()
multiple times, but it would
be nice if there were a way to to it all in one command.
The new aggregation framework, on the other hand, has phased computation at its heart, with the pipeline the fundamental structure of aggregation.
A series of tubes
So now to the rescue comes MongoDB's new aggregation framework. To understand the new framework, the first thing you need to know is that it's all based around the aggregation pipeline. Conceptually, MongoDB sends each document in a collection into the first operator on the pipeline, which may modify it and send it along to the next operation, similar to a UNIX pipeline.
Another way you can look at the new aggregation framework is that it's a
'super-find'. The aggregate
command can do anthing the find()
method can do,
and quite a bit more. For instance, the following query:
q = db.my_collection.find({...some query...}, {...some fields...}) q = q.sort(...some sort...) q = q.skip(...some skip...) q = q.limit(...some limit...)
would be implemented by the following aggregate
pipeline:
pipeline = [ { '$match': ...some query... }, { '$sort': ...some sort...}, { '$skip': ...some skip... }, { '$limit': ...some limit... } { '$project': ...some fields...}, ] q = db.command('aggregate', 'my_collection', pipeline=pipeline )
When you're using aggregate
, one thing to keep in mind is that order
matters. Because of this, you want to place the most selective parts of your
pipeline first, in order to reduce the number of documents successive pipeline
stages have to process.
Above, the pipeline (conceptually) does the following:
- For every document in 'my_collection', pass along only those documents that
$match
the given query $sort
all the documents given up to this point by the sort key, and then send the sorted documents along$skip
over the given number of documents, not sending them along to the successive stages of the pipeline, and then send the rest down the pipe.- Only send the given
$limit
number of documents to the next stages of the pipeline $project
the documents into a new format, sending along each re-shaped document
At the end, we have something that duplicates the output of the query above. Now,
if we had decided to $sort
, then $match
, the server could have quite a few
more documents to $sort
.
Actually aggregating
Although it's nice that MongoDB has grown a 'super-find' operation, our
aggregation pipelines can do much more. Suppose we want to compute an average of
a number of x
values, grouped by y
values. First, we'll insert some random
data for testing (I'm running my MongoDB server on port 27018, rather than the
default 27017):
>>> import pymongo >>> import random >>> conn = pymongo.Connection(port=27018) >>> db = conn.agg >>> db.data.insert([ ... dict(x=random.random(), y=random.randint(0,10)) ... for i in range(50)]) [ObjectId(...)...]
Basic grouping and document shaping
Now, to compute the average, we can use the $group
pipeline operator:
>>> pipeline = [ ... {'$group': { ... '_id': '$y', ... 'mean': {'$avg': '$x'} } } ] >>> db.command('aggregate', 'data', pipeline=pipeline) {u'ok': 1.0, u'result': [ {u'_id': 8, u'mean': 0.45158769756086975}, {u'_id': 7, u'mean': 0.10767920380630946}, {u'_id': 9, u'mean': 0.537524071579183}, {u'_id': 5, u'mean': 0.4712010849527597}, {u'_id': 10, u'mean': 0.6450144108731366}, {u'_id': 3, u'mean': 0.5829812441520267}, {u'_id': 2, u'mean': 0.5139874597597852}, {u'_id': 1, u'mean': 0.35474322219101523}, {u'_id': 4, u'mean': 0.6813855492820068}, {u'_id': 0, u'mean': 0.47123430216880813}]}
The first thing to note above is the use of $x
and $y
. This is the
aggregation framework's syntax for referring to fields in a document. This is
necessary because we can do some simple operations on documents as well. For
instance, suppose we wanted to double the x
values. For this, we would use the
$project
operator:
>>> pipeline = [ ... { '$project': { ... '_id': 0, ... 'x': p1, ... 'y': 1, ... 'double_x': { ... '$multiply': [ '$x', 2 ] ... } } } ] >>> db.command('aggregate', 'data', pipeline=pipeline) {u'ok': 1.0, u'result': [ {u'y': 8, u'x': 0.0731577856277077, u'double_x': 0.1463155712554154}, {u'y': 8, u'x': 0.6628554147686313, u'double_x': 1.3257108295372626}, ... ] }
Unwinding arrays
Another nice thing we can do is to treat elements of an embedded array in a document as documents themselves. Suppose we had a number of comments embedded in a blog post, for instance:
>>> db.blog.articles.insert( [ ... { 'title': 'First Post', ... 'comments': [ ... {'author': 'Someone'}, ... {'author': 'Someone Else'}, ... {'author': 'Someone'}, ... {'author': 'Someone'} ] }, ... { 'title': 'Second Post', ... 'comments': [ ... {'author': 'Another commenter'}, ... {'author': 'Someone Else'}, ... {'author': 'Someone'} ] } ] ) [ObjectId(...), ObjectId(...)]
Now, suppose we want to find out how many times each commenter has posted. To do
this, we'll start by using the $unwind
operator:
>>> pipeline = [ {'$unwind': '$comments'} ] >>> db.command('aggregate', 'blog.articles', pipeline=pipeline) {u'ok': 1.0, u'result': [ {u'_id': ObjectId(...), u'comments': {u'author': u'Someone'}, u'title': u'First Post'}, {u'_id': ObjectId('...'), u'comments': {u'author': u'Someone Else'}, u'title': u'First Post'}, ... ] }
Note here how the documents coming out of $unwind
have the same structure as
the input, except for the unwound array, which has been replaced by a single
element. Once we have the unwound documents, we can add some other operators to
calculate the number of posts per commenter:
>>> pipeline = [ ... {'$unwind': '$comments'}, ... {'$group': { ... '_id': '$comments.author', ... 'comments': { '$sum': 1 } } } ] >>> db.command('aggregate', 'blog.articles', pipeline=pipeline) {u'ok': 1.0, u'result': [ {u'_id': u'Another commenter', u'comments': 1}, {u'_id': u'Someone Else', u'comments': 2}, {u'_id': u'Someone', u'comments': 4}]}
One thing to note is that we can use $group
as a sort of reverse-$unwind
by
using the agggregation function $push
. Returning to our synthetic (x,y)
example, we can "wind up" all the x
values for a given y
:
>>> pipeline = [ ... {'$group': { ... '_id': '$y', ... 'xs': {'$push': '$x'} } }, ... {'$project': { ... '_id': 0, ... 'y': '$_id', ... 'xs': 1 } }, ... {'$sort': 'y'} ] >>> db.command('aggregate', 'data', pipeline=pipeline) {u'ok': 1.0, u'result': [ {u'y': 0, u'xs': [ 0.5480646819579362, 0.06989250292970206, 0.1483275969049508, 0.37886136252339875, 0.944175551482085, 0.6528854869305418, 0.6699875222192363, 0.038172503680066416, 0.29335286756814327, 0.8896985443297418, 0.5501587033310872] }, ... ] }
Here, we first $group
ed to compute the aggregate we are interested in, then
applied a $project
to rename the fields a bit, finishing up with a $sort
to
present the results in sorted order.
The new aggregation can't do everything (yet)
So now that you've seen the amazing new aggregation framework, you might be ready to drop everything else and use it. Unfortunately, there are a few things you need to be aware of that can bite you.
There is no $out
operator
Sadly, the planned $out
operator didn't make it into the 2.1 release of the
aggregation framework. What this means is that you can't define a pipeline and
have it put its results into another collection; you need to be ready to process
the results when the command returns. Because of this, aggregation results are
limited to 16MB due to the document size limit in MongoDB. So if you need to
compute millions of groups, you're going to be stuck one of the following
options:
- Breaking up your pipeline with
$match
or$skip
/$limit
so each pipeline's results are under 16MB - Falling back to
mapreduce
- Puling the data out of MongoDB and doing the aggregations in some external system (this is the approach used in Zarkov, for instance). all the computation to happen on the server.
There is no explain()
(yet)
If you're interested in how your queries scale in MongoDB (and you should be),
you've probably become quite accustomed to the explain()
command. Unfortunately, there is no corresponding command for the new aggregation
framework unless you're using the nightly builds.. This means you have to be
careful when constructing your pipelines. The rules of thumb I use are
- Put your
$match
operator first. This operator can use an index (yay!), and you can actually test that by using the same query withfind().explain()
. - If possible, put your
$sort
immediately after the$match
. This makes the first two stages behave just like afind().sort()
, which again isexplain()
able. - Be careful with the stateful operators. Most pipeline operators are
stateless, so their memory usage does not change with the length of the
pipeline. But
$group
and$sort
are not, since they accumulate results in RAM. So be careful where you put them.
If you are up to using the nightly builds, or want to wait till the explain feature makes it into an official release, the always-awesome [Chris Westin][chris-westin] points out that you can use explain by passing it as an additional parameter to your aggregation command:
>>> db.command('aggregate', 'data', pipeline=pipeline, explain=True) ...
There are a number of other features in the new aggregation framework that I haven't covered here. For more details, visit the official docs. I'd be interested in hearing what you think about the new aggregation framework, particularly if anyone is using it in production (or pre-production). Let me know in the comments below!
explain is now in the development builds. Add explain:true to your aggregation command.
ReplyDeleteThanks for the info, Chris. I've updated the post to reflect that information.
ReplyDeleteAnd thanks again for the framework!
Suppose I want to compute the log N in the pipeline, the operators I see are $add, $multiply, etc. Where can I find and functions supported by aggregation. Not all computation involves those simple operators. May be a power function. How would I do that in aggregation.
ReplyDeleteThanks for the comment! Currently, only simple arithmetic expressions are supported by the aggregation framework (http://docs.mongodb.org/manual/reference/aggregation/#expressions). From what I understand, it's straightforward to add new operators, so if there are particular things you're interested in, I'd recommend creating a JIRA ticket at https://jira.mongodb.org
Delete