In some previous posts on mongodb and python, pymongo, and gridfs, I introduced the NoSQL database MongoDB how to use it from Python, and how to use it to store large (more than 16 MB) files in it. Here, I'll be showing you a few of the features that the current (2.0) version of MongoDB includes for performing aggregation. In a future post, I'll give you a peek into the new aggregation framework included in MongoDB version 2.1.
Basic aggregation
Included in MongoDB are three operators useful for doing basic multi-document
queries and aggregations: count()
, distinct()
, and group()
.
Count
count()
is most basic. count()
returns the number of
documents that match the cursor's query. To illustrate this, first we'll create
some documents in a test
collection:
>>> import random >>> import pymongo >>> conn = pymongo.Connection() >>> db = conn.agg >>> db.data.insert([ ... dict(x=random.random(), y=random.randint(0, 10)) ... for i in range(50)]) [ObjectId(...)]
Now, let's count the documents where y
is 3
:
>>> >>> db.data.find({'y':3}).count() 3
This is normally what we want, but what if our cursor is using a skip()
or
limit()
? count()
ignores those:
>>> q = db.data.find({'y':3}).skip(1) >>> q.count() 3 >>> len(list(q)) 2
If you want to take skips and limits into account, you'll have to do some (a trivial amount) of calculation yourself:
def the_real_count(count, skip=0, limit=None): result = max(0, count - skip) if limit is not None: result = min(result, limit) return result
Now we can use it:
>>> the_real_count(q.count(), 1) 2
Distinct
Another thing that's often useful is checking to see which distinct values a
query returns. For this, MongoDB provides the distinct()
method:
>>> db.data.find({'y': {'$gt':3}}).distinct('y') [10, 6, 5, 7, 8, 4, 9]
Note that you need to provide the field you want the distinct values for.
Group
With group()
, we can perform somewhat more complex calculations. Grouping in
MongoDB is basically equivalent to a reduce()
operation in Python; an initial
value is provided, and then all documents in the group are reduced into it using
the reduce
function. The actual group()
function takes a number of
parameters, most of which must be specified:
key
- This can be one of the following:
None
, indicating that the entire document is the group key- A list of property names indicating the group key
- A Javascript function which will return the group key when called for each document
condition
- This is a condition used to filter the input to the group operation (i.e. it
is equivalent to the
WHERE
clause in a SQL statementGROUP BY
, not theHAVING
caluse. initial
- This is the initial object which will be used to 'kick off' the reduction.
reduce
- This is a Javascript function which will be used to combine all the matching documents pairwise until only one remains.
finalize
- This is an optional Javascript function which will be called on the result of the reduction to generate the final value. It might useful for computing a mean value from a sum and count, for instance.
An example is illustrative. To see the count and sum of all the x values in our collection, grouped by y values, only for y values greater than 3, the query would be the following:
>>> key = [ 'y' ] >>> condition = { 'y': { '$gt': 3 } } >>> initial = { 'count': 0, 'sum': 0 } >>> reduce = 'function(doc, out) { out.count++; out.sum += doc.x; }' >>> db.data.group(key, condition, initial, reduce) [{u'y': 10.0, u'count': 6.0, u'sum': 3.9350696527378295}, {u'y': 6.0, u'count': 10.0, u'sum': 5.312402119253351}, {u'y': 5.0, u'count': 3.0, u'sum': 0.9414083462732217}, {u'y': 7.0, u'count': 7.0, u'sum': 2.992867324959482}, {u'y': 8.0, u'count': 6.0, u'sum': 1.429589852438617}, {u'y': 4.0, u'count': 5.0, u'sum': 1.7804494395579495}, {u'y': 9.0, u'count': 3.0, u'sum': 1.6105657361868966}]
If we would also like to return the mean x
value, we can include a finalize
function:
>>> finalize = 'function(out) { out.mean = out.sum / out.count; }' >>> db.data.group(key, condition, initial, reduce, finalize) [{u'y': 10.0, u'count': 6.0, u'sum': 3.9350696527378295, u'mean': 0.6558449421229716}, {u'y': 6.0, u'count': 10.0, u'sum': 5.312402119253351, u'mean': 0.5312402119253351}, {u'y': 5.0, u'count': 3.0, u'sum': 0.9414083462732217, u'mean': 0.3138027820910739}, {u'y': 7.0, u'count': 7.0, u'sum': 2.992867324959482, u'mean': 0.42755247499421173}, {u'y': 8.0, u'count': 6.0, u'sum': 1.429589852438617, u'mean': 0.23826497540643618}, {u'y': 4.0, u'count': 5.0, u'sum': 1.7804494395579495, u'mean': 0.3560898879115899}, {u'y': 9.0, u'count': 3.0, u'sum': 1.6105657361868966, u'mean': 0.5368552453956322}]
Warning
You might be thinking that group()
is awesome, and it's exactly what you need,
but there are two things to be aware of with group()
which may make it a
non-starter for your needs:
group()
cannot be used on sharded collections. It just doesn't work. So if you're doing sharding, you'll have to go themapreduce()
route (below).group()
, since it uses Javascript, uses the SpiderMonkey global interpreter lock, meaning that all Javascript, whether it's adb.eval
call,group
,mapreduce
, or$where
in a query, must be serialized. So don't plan on doing a lot ofgroup()
s concurrently.
If you're only using group()
occasionally on a non-sharded collection, it might
be fine. For a more general and (IMO) better solution, however, read on for mapreduce
:
Map/Reduce
MongoDB provides an implementation of the
MapReduce algorithm using the collection-level mapreduce
method. Once again, we're using Javascript, so the same caveats apply regarding
the global Javascript interpreter lock, but since mapreduce
provides support
for sharding, we can actually squeeze a bit of parallelism out of it here.
One way to think of mapreduce
as used in MongoDB is that it is a generalized
group
method. The particular differences between mapreudce
and group
are as
follows:
mapreduce
allows us to provide an initialmap
function which will be used to generate documents to be grouped by key, whereasgroup
only allowed us to specify a method that returned the key for a given document.mapreduce
allows us to keep our results in a collection in the database, rather than returning them immediately the waygroup
does. Withmapreduce
, we also have great flexibility on how those documents are stored in the collection. We can do one of the following:- store the results in their own collection, obliterating any existing collection of the same name,
- store the results in a possibly pre-existing collection, overwriting any results that are already there, or
- store the results in a possibly pre-existing collection, reducing the results with any results currently stored there.
So to take up our example from the group
discussion above, we can do the same
thing in mapreduce
as follows:
>>> query = { 'y': { '$gt': 3 } } >>> map = 'function() { emit(this.y, { count: 1, sum: this.x}); }' >>> reduce = '''function(key, values) { ... var result = { count:0, sum: 0 }; ... values.forEach(function(value) { ... result.count += value.count; ... result.sum += value.sum; }) ... return result; }''' >>> finalize = '''function(k, v) { ... return { sum: v.sum, ... count: v.count, ... mean: v.sum/v.count } } ... ''' >>> db.data.map_reduce(map, reduce, {'inline':1}, query=query, finalize=finalize) {u'counts': {u'input': 40, u'reduce': 7, u'emit': 40, u'output': 7}, u'timeMillis': 4, u'ok': 1.0, u'results': [ {u'_id': 4.0, u'value': { u'count': 5.0, u'sum': 1.7804494395579495, u'mean': 0.3560898879115899}}, ...]}
Note in particular that the structure of our documents has changed. With group
, we
were working on 'native' documents from our collection. Now we are working with
"keys" and "values", where the "keys" are whatever was the first argument to
emit
in our map
fuction, and the "values" are whatever was the second
argument.
The actual mapreduce
method takes quite a few parameters in addition
to the required map
, reduce
, and out
:
map
- required The Javascript function that calls
emit(key, value)
for each input document to thereduce
. Note that in themap
function,this
refers to the current document beingmap
ped. reduce
- required The Javascript function that takes a key and an array of
values and which should return the reduction of those values. Note that
reduce
may be called multiple times for a given key, and may be used to further reduce an already reduced value, so the value returned fromreduce
should be similar in structure to the valuesemit
ted frommap
. out
- required (at least in Python) An object specifying what to do with the results, whether to return them inline, or to put them into a collection, and how to handle putting them into a collection. Options are listed below.
query
- optional A MongoDB query used to filter the collection before sending its
documents to the
map
function. sort
- optional This will cause the input to
map
to be sorted. Specifying a sort key that groups the documents by the key thatmap
willemit()
can lead to fewerreduce
calls. Strictly a performance optimization unlesslimit
is used, below. limit
- optional, not sharding-compatible Limits the number of documents send to
map
. finalize
- optional The Javascript function called on the results of
reduce
. Useful, for generating secondary aggregates such as mean. scope
- optional Used to set the scope variable for
map
,reduce
, andfinalize
sharded
- If
True
, then shard the output collection on its_id
field.
There are a few others, but they're less commonly used. The official docs give more details.
The out
parameter is one that deserves a bit more explanation. It specifies
what to do with the result of the mapreduce
job, and can be of one of the
following formats:
{'replace': <collection name>}
- The output is inserted into a collection, atomically replacing any existing collection with the same name.
{'merge': <collection name>}
- The output is inserted into a collection, overwriting any values with the same
key but not removing the entire collection as
replace
does. {'reduce': <collection name>}
- The contents of the collection are treated as inputs to the
reduce
step. This option allows for incremental updates to an output collection.
You can also specify that the result should be written to another database using
the the out
parameter. For this, the order of keys matters and if you're using
Python, you should use a collections.ordereddict
or bson.SON
object:
>>> db.data.map_reduce(map, reduce, bson.SON([ ... ('replace', 'agg_result'), ... ('db', 'otheragg') ... ]), query=query, finalize=finalize) Collection(Database(Connection('localhost', 27017), u'otheragg'), u'agg_result') >>> conn.otheragg.agg_result.count() 7 >>> conn.otheragg.agg_result.find_one() {u'_id': 4.0, u'value': { u'count': 5.0, u'sum': 1.7804494395579495, u'mean': 0.3560898879115899}}
Mapreduce and Sharding
If you need to get any parallelism out of your mapreduce
, you're going to need
to do it on a sharded cluster. If your input collection is sharded, then MongoDB
will send a mapreduce
job to each shard to be executed in parallel. Once all
the shards are done, a final reduce/finalize will be run and the output
collection will be written.
The details on how this happens in different versions vary, but the short answer is that if you're using MongoDB versions under 2.1/2.2, you should not shard your output. In 2.2 and above, sharded output of mapreduce has been overhauled and has much better performance/parallelism.
So what do you think? Anyone using any of the aggregation features in MongoDB? Interested in doing so? I'd love to hear about it in the comments below.
What a document...Its great..helped me a lot...Thanks
ReplyDeleteAWESOME: Great document for mapreduce in python
ReplyDeleteThanks, glad you found it useful!
Delete