Tuesday, July 26, 2011

Extending Zarkov's Map/Reduce

Since I've been working on Zarkov, I've been writing a few
map/reduce jobs. One of the things I noticed about map/reduce, at
least as I've implemented it in Zarkov, is that it's pretty
inefficient if you want t o generate several aggregate views of the
same data (like say an event stream). In order to meet Zarkov's
performance requirements without making my operations team
too angry, I decided to "extend" map/reduce, as it's
implemented in Zarkov. I'm not terribly creative, so I called this
command xmapreduce. Here' I'll briefly describe
map/reduce, show the problem with it, and explain the solution
implemented in Zarkov.

Map/Reduce

The map/reduce algorithm is nice because a) it's
extremely scalable and b) it's pretty easy to grasp the basics. The
idea is that you write two functions, map and
reduce which will be applied to a large dataset to get
yourself some interesting data. For our purposes here, I'll
illustrate using Zarkov map/reduce.
Let's say you want to see how many timestamped objects in some
collection exist for each date. Here's how you'd write your Zarkov
map/reduce functions:
def map(objects): 
    for object in objects: 
        yield object['timestamp'].date(), 1 
 
def reduce(key, values): 
    return sum(values) 
The map/reduce framework will take these functions and do
(basically) the following (but efficiently distributed over several
workers):
import operator, itertools 
 
def map_reduce(input_collection, query, output_collection, map, reduce): 
    objects = input_collection.find(query) 
    map_results = list(map(objects)) 
    map_results.sort(key=operator.itemgetter(0)) 
    for key, kv_pairs in itertools.groupby(map_results, operator.itemgetter(0)): 
        value = reduce(key, [ v for k,v in kv_pairs ]) 
        output_collection.save({"_id":key, "value":value}) 
And that's pretty much all there is to it (conceptually).

The Problem with Map/Reduce

The problem arises when you have several output collections that
all depend on a single input collection (or even depend on the same
documents in the input collection). Map/reduce can get you
there, bit it'll execute the query once for each output collection,
and it needs to distribute the data to the map()
functions once for every output collection. This isn't too
efficient.
Let's extend our example above by tracking the date, month, and
year of each object. In classic map/reduce, we'd need three
map functions:
def map_date(objects): 
    for object in objects: 
        yield object['timestamp'].date(), 1 
 
def map_month(objects): 
    for object in objects: 
        yield object['timestamp'].date().replace(day=1), 1 
 
def map_year(objects): 
    for object in objects: 
        yield object['timestamp'].date().replace(month=1,day=1), 1 
 
def reduce(key, values): 
    return sum(values) 
Now, if we treat these three jobs separately, we get a lot of
duplicated effort. Particularly, Zarkov must query the input
collection three times and transfer the data to map workers three
times. (There's also duplicate serialization/deserialization of the
input objects, though the Python bson library is quite
fast.)

XMapReduce to the Rescue

The solution Zarkov uses for this case is to allow the
map function to return a target collection
along with key and value. What xmapreduce does is
transform the map functions above into one xmap
function, and tweaking our reduce function just a bit
to take the collection as an input:
def xmap(objects): 
    for object in objects: 
        yield { 
            'c':'by_date', 
            'k':object['timestamp'].date(), 
            'v':1 } 
        yield { 
            'c':'by_month', 
            'k':object['timestamp'].date().replace(day=1), 
            'v':1 } 
        yield { 
            'c':'by_year', 
            'k':object['timestamp'].date().replace(day=1, month=1), 
            'v':1 } 
 
def xreduce(collection, key, values): 
    return sum(values) 
The xmapreduce algorithm is just a slight bit more complex than
the map/reduce algorithm:
import operator, itertools 
 
def xmap_reduce(input_collection, query, output_db, map, reduce): 
    objects = input_collection.find(query) 
    map_results = list(map(objects)) 
    keyfunc = lambda doc:(doc['c'], doc['k']) 
    map_results.sort(key=keyfunc) 
    for (c,k), docs in itertools.groupby(map_results, keyfunc): 
        value = reduce(c,k, [ doc['v'] for doc in docs ])span> 
        output_db[c].save({"_id":key, "value":value}) 

Now, we can invoke a single job to calculate all three
aggregates, with close to a 3x reduction in data transfer and a
significant speedup. Assuming that a) your jobs can be
combined into a single xmapreduce job and b) you find it acceptable
to code the target collection in your map jobs, xmapreduce should
give you a significant speedup over classic map/reduce. In our
case, we started with 12 map/reduce jobs and ended up with 4
xmapreduce jobs, with a tremendous speedup, but of course your
mileage may vary. Happy hacking!

2 comments:

  1. Anonymous4:29 PM

    Thank you for this series, and for opening your code.

    A subject is still quite mysterious to me in the MapReduce model: the shuffle. It would be great if you could do an article on how you manage this in Zarkov.

    ReplyDelete
  2. Very briefly, the shuffle is how you guarantee that all the data for a given key is sent to the correct reduce method. Zarkov previously handled this by sorting (and grouping) the map results by key before sending them out for reduce. This was discovered to be a bottleneck.

    This has been enhanced in more recent versions by using a fixed number of 'reduce' workers and having the map perform hashing on the keys to determine which reduce worker the job goes to. During the reduce phase, each reduce worker groups the objects by key, effectively parallelizing the previously serial bottleneck of the sort. Does that help?

    ReplyDelete