Over the past few weeks I've been working on a service in Python that I'm calling, in the tradition of naming projects after characters in Flash Gordon, Zarkov. So what exactly is Zarkov? Well, Zarkov is many things (and may grow to more):
- Zarkov is an event logger
- Zarkov is a lightweight map-reduce framework
- Zarkov is an aggregation service
- Zarkov is a webservice
In my previous post, I discussed Zarkov as an event logger. While this may be useful (say for logging to a central location from several different servers), there's a bit more to Zarkov. Today I'll focus on the map-reduce framework provided by Zarkov. If you want instructions on setting up Zarkov or getting events into it, please see my previous post.
Why Another Map-Reduce Framework?
Good question. Currently there exist two big frameworks of which I'm aware: Hadoop and Disco. Hadoop is a framework written in Java that includes all sorts of goodness to make sure that your larger-than-life job gets executed relatively efficiently on a large cluster. Disco has the same kinds of things. Both of them require a good bit of setup to get running. Both of them provide a distributed filesystem that they really like to get jobs from. Neither has tight MongoDB integration.
I wanted something smaller. Something easy to configure, with tight MongoDB integration. "But wait," I hear you say, "MongoDB has built-in map-reduce!" In fact, Zarkov began life as a front-end for MongoDB's own map-reduce. Unfortunately, there are a few problems with the built-in mapreduce command in MongoDB:
- it saves its intermediate results in MongoDB collections (lots of locking)
- it is restricted to a single thread per mongod process (thanks a lot, Spidermonkey :-p )
- it doesn't allow scaling compute power independent of shards
So I set out to write my own....
Setting up a single-node Zarkov map-reduce (zmr) cluster
This is actually pretty simple. This is how you do it:
(zarkov) $ # Create the default directory in which zmr stores its temp files (zarkov) $ mkdir /tmp/zmr (zarkov) $ zcmd -y development.yaml serve
Yes, that's the same command as the event logger. You specify in the development.yaml file how many worker processes you want to keep "locally", and these will start up automatically with your router. For more complex setups, or to run the zmr framework without the Zarkov event logger, you can do the following:
(zarkov) $ # Start "router" -- this will also start local worker procs (zarkov) $ zcmd -y development.yaml zmr-router &
Then on your worker nodes:
(zarkov) $ zcmd zmr-worker tcp://worker-ip:5555 &
Note that there's actually no need to specify the config file for the worker process; workers will connect to the router to retrieve their configuration. You do want to make sure that you start all your workers before you start your job in order to correctly load-balance between them.
Running a job on a zmr cluster
Running a job is actually pretty simple. You need to connect to the zmr request port (by default this is 5555) on the router using a ZeroMQ REQ socket and submit a BSON message of the following format:
- database: what MongoDB database to read/write to
- collection: what collection forms the input to the job?
- query: a bson.Binary() containing a BSON-encoded query object
- map: python source code string containing a map function
- reduce: python source code string containing a reduce function
- map_name: name of the function in the "map" param
- reduce_name: name of the function in the "reduce" param
- out: collection in which to store the result of the job
- out_type: replace, merge, or reduce, corresponding to the MongoDB mapreduce command options
The map function has the following format:
def map(objects): for object in objects: output = do_something_to(object) yield new_key, value
Basically, it's a generator that takes a sequence of dicts and yields zero or more (key, dict) pairs. The reduce function has the following format:
def reduce(key, values): return one_value_from_many(values)
So for instance, if we wanted to count the number of objects, grouped by the field 'foo', you could do the following:
def map(objects): for object in objects: yield object['foo'], 1 def reduce(key, values): return sum(values)
The result would be a collection with its objects' _id fields equal to the 'foo' values from the input collection and its 'value' fields equal to the number of objects with that 'foo' value.