Wednesday, May 30, 2012

Aggregation in MongoDB (Part 1)

In some previous posts on mongodb and python, pymongo, and gridfs, I introduced the NoSQL database MongoDB how to use it from Python, and how to use it to store large (more than 16 MB) files in it. Here, I'll be showing you a few of the features that the current (2.0) version of MongoDB includes for performing aggregation. In a future post, I'll give you a peek into the new aggregation framework included in MongoDB version 2.1.

Basic aggregation

Included in MongoDB are three operators useful for doing basic multi-document queries and aggregations: count(), distinct(), and group().

Count

count() is most basic. count() returns the number of documents that match the cursor's query. To illustrate this, first we'll create some documents in a test collection:

>>> import random
>>> import pymongo
>>> conn = pymongo.Connection()
>>> db = conn.agg
>>> db.data.insert([
...     dict(x=random.random(), y=random.randint(0, 10)) 
...     for i in range(50)])
[ObjectId(...)]

Now, let's count the documents where y is 3:

>>> >>> db.data.find({'y':3}).count()
3

This is normally what we want, but what if our cursor is using a skip() or limit()? count() ignores those:

>>> q = db.data.find({'y':3}).skip(1)
>>> q.count()
3
>>> len(list(q))
2

If you want to take skips and limits into account, you'll have to do some (a trivial amount) of calculation yourself:

def the_real_count(count, skip=0, limit=None):
    result = max(0, count - skip)
    if limit is not None:
        result = min(result, limit)
    return result

Now we can use it:

>>> the_real_count(q.count(), 1)
2

Distinct

Another thing that's often useful is checking to see which distinct values a query returns. For this, MongoDB provides the distinct() method:

>>> db.data.find({'y': {'$gt':3}}).distinct('y')
[10, 6, 5, 7, 8, 4, 9]

Note that you need to provide the field you want the distinct values for.

Group

With group(), we can perform somewhat more complex calculations. Grouping in MongoDB is basically equivalent to a reduce() operation in Python; an initial value is provided, and then all documents in the group are reduced into it using the reduce function. The actual group() function takes a number of parameters, most of which must be specified:

key
This can be one of the following:
  • None, indicating that the entire document is the group key
  • A list of property names indicating the group key
  • A Javascript function which will return the group key when called for each document
condition
This is a condition used to filter the input to the group operation (i.e. it is equivalent to the WHERE clause in a SQL statement GROUP BY, not the HAVING caluse.
initial
This is the initial object which will be used to 'kick off' the reduction.
reduce
This is a Javascript function which will be used to combine all the matching documents pairwise until only one remains.
finalize
This is an optional Javascript function which will be called on the result of the reduction to generate the final value. It might useful for computing a mean value from a sum and count, for instance.

An example is illustrative. To see the count and sum of all the x values in our collection, grouped by y values, only for y values greater than 3, the query would be the following:

>>> key = [ 'y' ]
>>> condition = { 'y': { '$gt': 3 } }
>>> initial = { 'count': 0, 'sum': 0 }
>>> reduce = 'function(doc, out) { out.count++; out.sum += doc.x; }'
>>> db.data.group(key, condition, initial, reduce)
[{u'y': 10.0, u'count': 6.0, u'sum': 3.9350696527378295}, 
 {u'y': 6.0, u'count': 10.0, u'sum': 5.312402119253351}, 
 {u'y': 5.0, u'count': 3.0, u'sum': 0.9414083462732217}, 
 {u'y': 7.0, u'count': 7.0, u'sum': 2.992867324959482}, 
 {u'y': 8.0, u'count': 6.0, u'sum': 1.429589852438617}, 
 {u'y': 4.0, u'count': 5.0, u'sum': 1.7804494395579495}, 
 {u'y': 9.0, u'count': 3.0, u'sum': 1.6105657361868966}]

If we would also like to return the mean x value, we can include a finalize function:

>>> finalize = 'function(out) { out.mean = out.sum / out.count; }'
>>> db.data.group(key, condition, initial, reduce, finalize)
[{u'y': 10.0, u'count': 6.0, u'sum': 3.9350696527378295, u'mean': 0.6558449421229716}, 
 {u'y': 6.0, u'count': 10.0, u'sum': 5.312402119253351, u'mean': 0.5312402119253351}, 
 {u'y': 5.0, u'count': 3.0, u'sum': 0.9414083462732217, u'mean': 0.3138027820910739}, 
 {u'y': 7.0, u'count': 7.0, u'sum': 2.992867324959482, u'mean': 0.42755247499421173}, 
 {u'y': 8.0, u'count': 6.0, u'sum': 1.429589852438617, u'mean': 0.23826497540643618}, 
 {u'y': 4.0, u'count': 5.0, u'sum': 1.7804494395579495, u'mean': 0.3560898879115899}, 
 {u'y': 9.0, u'count': 3.0, u'sum': 1.6105657361868966, u'mean': 0.5368552453956322}]

Warning

You might be thinking that group() is awesome, and it's exactly what you need, but there are two things to be aware of with group() which may make it a non-starter for your needs:

  • group() cannot be used on sharded collections. It just doesn't work. So if you're doing sharding, you'll have to go the mapreduce() route (below).
  • group(), since it uses Javascript, uses the SpiderMonkey global interpreter lock, meaning that all Javascript, whether it's a db.eval call, group, mapreduce, or $where in a query, must be serialized. So don't plan on doing a lot of group()s concurrently.

If you're only using group() occasionally on a non-sharded collection, it might be fine. For a more general and (IMO) better solution, however, read on for mapreduce:

Map/Reduce

MongoDB provides an implementation of the MapReduce algorithm using the collection-level mapreduce method. Once again, we're using Javascript, so the same caveats apply regarding the global Javascript interpreter lock, but since mapreduce provides support for sharding, we can actually squeeze a bit of parallelism out of it here.

One way to think of mapreduce as used in MongoDB is that it is a generalized group method. The particular differences between mapreudce and group are as follows:

  • mapreduce allows us to provide an initial map function which will be used to generate documents to be grouped by key, whereas group only allowed us to specify a method that returned the key for a given document.
  • mapreduce allows us to keep our results in a collection in the database, rather than returning them immediately the way group does. With mapreduce, we also have great flexibility on how those documents are stored in the collection. We can do one of the following:
  • store the results in their own collection, obliterating any existing collection of the same name,
  • store the results in a possibly pre-existing collection, overwriting any results that are already there, or
  • store the results in a possibly pre-existing collection, reducing the results with any results currently stored there.

So to take up our example from the group discussion above, we can do the same thing in mapreduce as follows:

>>> query = { 'y': { '$gt': 3 } }
>>> map = 'function() { emit(this.y, { count: 1, sum: this.x}); }'
>>> reduce = '''function(key, values) {
...     var result = { count:0, sum: 0 };
...     values.forEach(function(value) {
...         result.count += value.count;
...         result.sum += value.sum; })
...     return result; }'''
>>> finalize = '''function(k, v) { 
...     return { sum: v.sum, 
...              count: v.count,
...              mean: v.sum/v.count } }
... '''
>>> db.data.map_reduce(map, reduce, {'inline':1}, query=query, finalize=finalize)
{u'counts': {u'input': 40, 
             u'reduce': 7, 
             u'emit': 40, 
             u'output': 7}, 
 u'timeMillis': 4, 
 u'ok': 1.0, 
 u'results': [
   {u'_id': 4.0, u'value': {
     u'count': 5.0, 
     u'sum': 1.7804494395579495, 
     u'mean': 0.3560898879115899}}, ...]}

Note in particular that the structure of our documents has changed. With group, we were working on 'native' documents from our collection. Now we are working with "keys" and "values", where the "keys" are whatever was the first argument to emit in our map fuction, and the "values" are whatever was the second argument.

The actual mapreduce method takes quite a few parameters in addition to the required map, reduce, and out:

map
required The Javascript function that calls emit(key, value) for each input document to the reduce. Note that in the map function, this refers to the current document being mapped.
reduce
required The Javascript function that takes a key and an array of values and which should return the reduction of those values. Note that reduce may be called multiple times for a given key, and may be used to further reduce an already reduced value, so the value returned from reduce should be similar in structure to the values emitted from map.
out
required (at least in Python) An object specifying what to do with the results, whether to return them inline, or to put them into a collection, and how to handle putting them into a collection. Options are listed below.
query
optional A MongoDB query used to filter the collection before sending its documents to the map function.
sort
optional This will cause the input to map to be sorted. Specifying a sort key that groups the documents by the key that map will emit() can lead to fewer reduce calls. Strictly a performance optimization unless limit is used, below.
limit
optional, not sharding-compatible Limits the number of documents send to map.
finalize
optional The Javascript function called on the results of reduce. Useful, for generating secondary aggregates such as mean.
scope
optional Used to set the scope variable for map, reduce, and finalize
sharded
If True, then shard the output collection on its _id field.

There are a few others, but they're less commonly used. The official docs give more details.

The out parameter is one that deserves a bit more explanation. It specifies what to do with the result of the mapreduce job, and can be of one of the following formats:

{'replace': <collection name>}
The output is inserted into a collection, atomically replacing any existing collection with the same name.
{'merge': <collection name>}
The output is inserted into a collection, overwriting any values with the same key but not removing the entire collection as replace does.
{'reduce': <collection name>}
The contents of the collection are treated as inputs to the reduce step. This option allows for incremental updates to an output collection.

You can also specify that the result should be written to another database using the the out parameter. For this, the order of keys matters and if you're using Python, you should use a collections.ordereddict or bson.SON object:

>>> db.data.map_reduce(map, reduce, bson.SON([
...     ('replace', 'agg_result'), 
...     ('db', 'otheragg')
... ]), query=query, finalize=finalize)
Collection(Database(Connection('localhost', 27017), u'otheragg'), u'agg_result')
>>> conn.otheragg.agg_result.count()
7
>>> conn.otheragg.agg_result.find_one()
{u'_id': 4.0, 
 u'value': {
     u'count': 5.0, 
     u'sum': 1.7804494395579495, 
     u'mean': 0.3560898879115899}}

Mapreduce and Sharding

If you need to get any parallelism out of your mapreduce, you're going to need to do it on a sharded cluster. If your input collection is sharded, then MongoDB will send a mapreduce job to each shard to be executed in parallel. Once all the shards are done, a final reduce/finalize will be run and the output collection will be written.

The details on how this happens in different versions vary, but the short answer is that if you're using MongoDB versions under 2.1/2.2, you should not shard your output. In 2.2 and above, sharded output of mapreduce has been overhauled and has much better performance/parallelism.

So what do you think? Anyone using any of the aggregation features in MongoDB? Interested in doing so? I'd love to hear about it in the comments below.

3 comments:

  1. What a document...Its great..helped me a lot...Thanks

    ReplyDelete
  2. Anonymous2:14 AM

    AWESOME: Great document for mapreduce in python

    ReplyDelete