Friday, July 22, 2011

Zarkov is an aggregation service

Over the past few weeks I've been working on a service in Python that I'm calling, in the tradition of naming projects after characters in Flash Gordon, Zarkov. So what exactly is Zarkov? Well, Zarkov is many things (and may grow to more):

In my first post, I discussed Zarkov as an event logger. Next, I explored Zarkov as a map-reduce framework. Today, I'll focus on Zarkov as an aggregation service suitable for doing near real-time analytics.


Aggregation Definitions

In Zarkov, aggregations are defined in the agg_def collection. Here's the Ming schema for that collection:

# Aggregations defined over (by default) the 'event' collection 
agg_def = collection( 
    'agg_def', doc_session, 
    Field('_id', S.ObjectId()), 
    # Name for the aggregation 
    Field('name', str), 
    # Input collection to aggregate 
    Field('object', str, if_missing='event'), 
    # Pointer to python Aggregation subclass 
    Field('task_name', str), 
    # Timestamp of last aggregation 
    Field('last_run', datetime, if_missing=datetime(1970,1,1))) 
 
class AggDef(object): 
    # some helper methods 
 
orm_session.mapper(AggDef, agg_def) 

AggDefs basically define an input collection ('object'), a meaningful 'name', and a 'task_name ' to define the logic behind the aggregation. task_name is actually a dotted name that's suitable for use in Python's import statement that points to a 'Task' subclass. There is also a 'last_run' timestamp that is used to restrict which documents in the object collection (by default zarkov.event) are examined for aggregation. When you request an aggregation from Zarkov, several things happen:

  • Zarkov loads the AggDef from MongoDB
  • Zarkov imports the Task subclass defined in the AggDef
  • Zarkov calls the 'run' method of the Task subclass with a REQ socket connected to the Zarkov map-reduce router

Task Definitions

Normally, you won't override the 'run' method yourself, instead overriding the _start_jobs method:

def _start_jobs(self, socket, full): raise NotImplementedError, '_start_jobs' 

If "full" is true, you'll need to treat this task as a "full recompute" rather than an incremental update. Usually, this just means that you'll be overwriting the output collection rather than reducing your results into it. There are a couple of things that the Task class does for you before calling _start_jobs:

  • "New" object documents have been identified. This is done by querying the object collection for documents whose 'aggregates' field does not contain this AggDef's name. In a "full" aggregation, this is done by querying the entire collection. If it is not a "full" aggregation, the query is further restricted to documents that have been inserted into zarkov since the last run of this AggDef.
  • "New" object documents have been marked for this via their 'jobs' field. All the documents that may require processing can thus be queried with {"jobs":self._lock_id}, where self is the Task instance.

In your _start_jobs method, then, you'll mostly be starting Zarkov map-reduce jobs. The Task parent class provides a start_mr() method to help with that:

def start_mr(self, socket, collection, full, filter=None, map=None, reduce=None, 
             map_name=None, reduce_name=None): 

The arguments are similar to the fields in the map-reduce request message covered in the previous post. If 'full' is true, the out_mode will be set to 'replace', otherwise it will be set to 'reduce'. start_mr(), in addition to actually kicking off the map-reduce job, notes the job ID in the Task instance so that the run() method can wait until all the started map-reduce jobs are complete before returning.

So how do you actually invoke an aggregation? Let's assume we have the following task definition in a module 'my.tasks':

from zarkov.task import Task 
from zarkov import model as M 
 
class MyTask(task): 
 
    def _start_jobs(socket, full): 
        db = M.doc_session.db 
        self.start_mr(socket, db.zagg.my_agg, full, 
            map=map, reduce=reduce) 
 
def map(objects): 
    for object in objects: 
        yield object['foo'], 1 
 
def reduce(key, values): 
    return sum(values) 

You can tell Zarkov about the agg via the following code (probably via zcmd shell or zcmd script:

from ming.orm import session 
from zarkov import model as M 
 
M.AggDef(name='my_agg_def', task_name='my.tasks.MyTask') 
session(M.AggDef).flush() 

Running aggregations

Now to trigger the aggregation, just make sure your Zarkov server and/or workers are running and execute the following command:

(zarkov) $ zcmd -y development.yaml aggregate my_agg_def

Wait a bit, and when the command exits, you should hve a brand new collection zagg.my_agg with the counts you requested. If you run the aggregate command multiple times, it should reduce only the new events added since the last aggregation run. And that's all there is to it.

No comments:

Post a Comment