map/reduce jobs. One of the things I noticed about map/reduce, at
least as I've implemented it in Zarkov, is that it's pretty
inefficient if you want t o generate several aggregate views of the
same data (like say an event stream). In order to meet Zarkov's
performance requirements without making my operations team
too angry, I decided to "extend" map/reduce, as it's
implemented in Zarkov. I'm not terribly creative, so I called this
command
xmapreduce
. Here' I'll briefly describemap/reduce, show the problem with it, and explain the solution
implemented in Zarkov.
Map/Reduce
The map/reduce algorithm is nice because a) it'sextremely scalable and b) it's pretty easy to grasp the basics. The
idea is that you write two functions,
map
andreduce
which will be applied to a large dataset to getyourself some interesting data. For our purposes here, I'll
illustrate using Zarkov map/reduce.
Let's say you want to see how many timestamped objects in some
collection exist for each date. Here's how you'd write your Zarkov
map/reduce functions:
def map(objects): for object in objects: yield object['timestamp'].date(), 1 def reduce(key, values): return sum(values)
(basically) the following (but efficiently distributed over several
workers):
import operator, itertools def map_reduce(input_collection, query, output_collection, map, reduce): objects = input_collection.find(query) map_results = list(map(objects)) map_results.sort(key=operator.itemgetter(0)) for key, kv_pairs in itertools.groupby(map_results, operator.itemgetter(0)): value = reduce(key, [ v for k,v in kv_pairs ]) output_collection.save({"_id":key, "value":value})
The Problem with Map/Reduce
The problem arises when you have several output collections thatall depend on a single input collection (or even depend on the same
documents in the input collection). Map/reduce can get you
there, bit it'll execute the query once for each output collection,
and it needs to distribute the data to the
map()
functions once for every output collection. This isn't too
efficient.
Let's extend our example above by tracking the date, month, and
year of each object. In classic map/reduce, we'd need three
map
functions:def map_date(objects): for object in objects: yield object['timestamp'].date(), 1 def map_month(objects): for object in objects: yield object['timestamp'].date().replace(day=1), 1 def map_year(objects): for object in objects: yield object['timestamp'].date().replace(month=1,day=1), 1 def reduce(key, values): return sum(values)
duplicated effort. Particularly, Zarkov must query the input
collection three times and transfer the data to map workers three
times. (There's also duplicate serialization/deserialization of the
input objects, though the Python
bson
library is quitefast.)
XMapReduce to the Rescue
The solution Zarkov uses for this case is to allow themap
function to return a target collectionalong with key and value. What
xmapreduce
does istransform the map functions above into one
xmap
function, and tweaking our
reduce
function just a bitto take the collection as an input:
def xmap(objects): for object in objects: yield { 'c':'by_date', 'k':object['timestamp'].date(), 'v':1 } yield { 'c':'by_month', 'k':object['timestamp'].date().replace(day=1), 'v':1 } yield { 'c':'by_year', 'k':object['timestamp'].date().replace(day=1, month=1), 'v':1 } def xreduce(collection, key, values): return sum(values)
the map/reduce algorithm:
import operator, itertools def xmap_reduce(input_collection, query, output_db, map, reduce): objects = input_collection.find(query) map_results = list(map(objects)) keyfunc = lambda doc:(doc['c'], doc['k']) map_results.sort(key=keyfunc) for (c,k), docs in itertools.groupby(map_results, keyfunc): value = reduce(c,k, [ doc['v'] for doc in docs ])span> output_db[c].save({"_id":key, "value":value})
aggregates, with close to a 3x reduction in data transfer and a
significant speedup. Assuming that a) your jobs can be
combined into a single xmapreduce job and b) you find it acceptable
to code the target collection in your map jobs, xmapreduce should
give you a significant speedup over classic map/reduce. In our
case, we started with 12 map/reduce jobs and ended up with 4
xmapreduce jobs, with a tremendous speedup, but of course your
mileage may vary. Happy hacking!
Thank you for this series, and for opening your code.
ReplyDeleteA subject is still quite mysterious to me in the MapReduce model: the shuffle. It would be great if you could do an article on how you manage this in Zarkov.
Very briefly, the shuffle is how you guarantee that all the data for a given key is sent to the correct reduce method. Zarkov previously handled this by sorting (and grouping) the map results by key before sending them out for reduce. This was discovered to be a bottleneck.
ReplyDeleteThis has been enhanced in more recent versions by using a fixed number of 'reduce' workers and having the map perform hashing on the keys to determine which reduce worker the job goes to. During the reduce phase, each reduce worker groups the objects by key, effectively parallelizing the previously serial bottleneck of the sort. Does that help?