Over the past few weeks I've been working on a service in Python that I'm calling, in the tradition of naming projects after characters in Flash Gordon, Zarkov. So what exactly is Zarkov? Well, Zarkov is many things (and may grow to more):
- Zarkov is an event logger
- Zarkov is a lightweight map-reduce framework
- Zarkov is an aggregation service
- Zarkov is a webservice
In my first post, I discussed Zarkov as an event logger. Next, I explored Zarkov as a map-reduce framework. Today, I'll focus on Zarkov as an aggregation service suitable for doing near real-time analytics.
Aggregation Definitions
In Zarkov, aggregations are defined in the agg_def collection. Here's the Ming schema for that collection:
# Aggregations defined over (by default) the 'event' collection agg_def = collection( 'agg_def', doc_session, Field('_id', S.ObjectId()), # Name for the aggregation Field('name', str), # Input collection to aggregate Field('object', str, if_missing='event'), # Pointer to python Aggregation subclass Field('task_name', str), # Timestamp of last aggregation Field('last_run', datetime, if_missing=datetime(1970,1,1))) class AggDef(object): # some helper methods orm_session.mapper(AggDef, agg_def)
AggDefs basically define an input collection ('object'), a meaningful 'name', and a 'task_name ' to define the logic behind the aggregation. task_name is actually a dotted name that's suitable for use in Python's import statement that points to a 'Task' subclass. There is also a 'last_run' timestamp that is used to restrict which documents in the object collection (by default zarkov.event) are examined for aggregation. When you request an aggregation from Zarkov, several things happen:
- Zarkov loads the AggDef from MongoDB
- Zarkov imports the Task subclass defined in the AggDef
- Zarkov calls the 'run' method of the Task subclass with a REQ socket connected to the Zarkov map-reduce router
Task Definitions
Normally, you won't override the 'run' method yourself, instead overriding the _start_jobs method:
def _start_jobs(self, socket, full): raise NotImplementedError, '_start_jobs'
If "full" is true, you'll need to treat this task as a "full recompute" rather than an incremental update. Usually, this just means that you'll be overwriting the output collection rather than reducing your results into it. There are a couple of things that the Task class does for you before calling _start_jobs:
- "New" object documents have been identified. This is done by querying the object collection for documents whose 'aggregates' field does not contain this AggDef's name. In a "full" aggregation, this is done by querying the entire collection. If it is not a "full" aggregation, the query is further restricted to documents that have been inserted into zarkov since the last run of this AggDef.
- "New" object documents have been marked for this via their 'jobs' field. All the documents that may require processing can thus be queried with
{"jobs":self._lock_id}
, whereself
is the Task instance.
In your _start_jobs
method, then, you'll mostly be starting Zarkov map-reduce jobs. The Task parent class provides a start_mr()
method to help with that:
def start_mr(self, socket, collection, full, filter=None, map=None, reduce=None, map_name=None, reduce_name=None):
The arguments are similar to the fields in the map-reduce request message covered in the previous post. If 'full' is true, the out_mode will be set to 'replace', otherwise it will be set to 'reduce'. start_mr()
, in addition to actually kicking off the map-reduce job, notes the job ID in the Task
instance so that the run()
method can wait until all the started map-reduce jobs are complete before returning.
So how do you actually invoke an aggregation? Let's assume we have the following task definition in a module 'my.tasks':
from zarkov.task import Task from zarkov import model as M class MyTask(task): def _start_jobs(socket, full): db = M.doc_session.db self.start_mr(socket, db.zagg.my_agg, full, map=map, reduce=reduce) def map(objects): for object in objects: yield object['foo'], 1 def reduce(key, values): return sum(values)
You can tell Zarkov about the agg via the following code (probably via zcmd shell
or zcmd script
:
from ming.orm import session from zarkov import model as M M.AggDef(name='my_agg_def', task_name='my.tasks.MyTask') session(M.AggDef).flush()
Running aggregations
Now to trigger the aggregation, just make sure your Zarkov server and/or workers are running and execute the following command:
(zarkov) $ zcmd -y development.yaml aggregate my_agg_def
Wait a bit, and when the command exits, you should hve a brand new collection zagg.my_agg
with the counts you requested. If you run the aggregate
command multiple times, it should reduce only the new events added since the last aggregation run. And that's all there is to it.
No comments:
Post a Comment