Saturday, July 23, 2011

Gevent, ZeroMQ, WebSockets, and Flot FTW!

As part of the work I've been doing on Zarkov I've had the opportunity to play around with a lot of cool technologies, among which are gevent, ZeroMQ, WebSockets, and Flot. It took a while to get to the point where I could actually get things done, but once I was there, things were incredibly simple. In this post I'll show you how to use these three techologies together to build a simple web app with realtime server push data.



Prerequisites

Before you can run the example program, you'll need to install the following Python libraries:

  • pyzmq
  • gevent
  • gevent-zeromq
  • gevent-websocket
  • paste (for a simple WSGI http static file server)

The easiest way to do this is to:

pip install pyzmq gevent gevent_zeromq paste 

You may see messages about missing system libraries; you'll need to have libzmq-dev 2.1+ and libevent-dev installed to build the Python packages.

You'll also need to download Flot JQuery plugin and untar it in your project directory.

Greenlets

One of the things you'll need to understand before running the code is the concept of green threads or greenlets. These behave just like threads except for two caveats:

  • they never run concurrently with one another
  • they only yield to one another at specified points

What this ends up meaning is that they're very lightweight to create compared to threads and you can actually be a little sloppier with shared data between greenlets since you can deterministically know when a greenlet might yield to another greenlet. The gevent library provides our greenlet support, and makes greenlets even more useful by turning blocking system calls into yield points so that another greenlet can run while one is waiting on data.

ZeroMQ

ZeroMQ sounds like it's a message queue like RabbitMQ, but it's really not. ZeroMQ is a library that provides many message queue-like features, but requires no broker and very little setup. We'll be using ZeroMQ pub/sub sockets to manage communication in this demo. Pub/sub sockets have the nice feature that published messages get distributed to all subscribers currently connected, or if no subscribers are connected, the message simply gets dropped.

ZeroMQ was designed to work with threads rather than greenlets, so we'll also need to use the excellent gevent_zeromq library to "greenletize" ZeroMQ. For this example, we'll write a loop that pushes (x,y) point values out on a zmq.PUB socket:

def zmq_producer(context): 
    '''Produce a nice time series sine wave''' 
    socket = context.socket(zmq.PUB) 
    socket.connect('tcp://127.0.0.1:5000') 
 
    while True: 
        x = time.time() 
        y = 2.5 * (1 + math.sin(x / 500)) 
        socket.send(json.dumps(dict(x=x, y=y))) 
        gevent.sleep(0.1) 

The context being passed in is created in our main() function with the call zmq.Context(). The context is just a place for ZeroMQ to store some global state; you generally only create one per application.

OK, now that we have our producer, let's look at the ZeroMQ server. What we'll do here is just relay messages received on an incoming tcp zmq.SUB socket and publish them on an outgong inproc zmq.PUB socket:

def zmq_server(context): 
    '''Funnel messages coming from the external tcp socket to an inproc socket''' 
    sock_incoming = context.socket(zmq.SUB) 
    sock_outgoing = context.socket(zmq.PUB) 
    sock_incoming.bind('tcp://*:5000') 
    sock_outgoing.bind('inproc://queue') 
    sock_incoming.setsockopt(zmq.SUBSCRIBE, "") 
    while True: 
        msg = sock_incoming.recv() 
        sock_outgoing.send(msg) 

The only thing to note here is that we have to tell ZeroMQ that we want to subscribe to all messages on the zmq.SUB socket.

Gevent WSGI Servers

Gevent provides a fast implementation of the WSGI standard in the gevent.pywsgi module. We'll be using two instances of the server (greenlets are lightweight) in this demo. One will simply serve up static files, while the other one will provide our WebSocket connectivity. Here's the code to set up these two servers:

# websocket server: copies inproc zmq messages to websocket 
ws_server = gevent.pywsgi.WSGIServer( 
    ('', 9999), WebSocketApp(context), 
    handler_class=WebSocketHandler) 
# http server: serves up static files 
http_server = gevent.pywsgi.WSGIServer( 
    ('', 8000), 
    paste.urlparser.StaticURLParser(os.path.dirname(__file__))) 

Pretty simple; just specify the bind address for the server, the WSGI app to call, and in the case of the WebSocket server, the WebSocket handler class. The http_server above is mostly uninteresting, as it just serves up static files via paste. The ws_server has a (slightly) more interesting implementation:

class WebSocketApp(object): 
    '''Funnel messages coming from an inproc zmq socket to the websocket''' 
 
    def __init__(self, context): 
        self.context = context 
 
    def __call__(self, environ, start_response): 
        ws = environ['wsgi.websocket'] 
        sock = self.context.socket(zmq.SUB)/span> 
        sock.setsockopt(zmq.SUBSCRIBE, "") 
        sock.connect('inproc://queue') 
        while True: 
            msg = sock.recv() 
            ws.send(msg) 

What's happening here is that when we get a connection to the websocket address (port 9999 in our example), we do the following:

  • Subscribe to the inproc socket that our zmq_server is publishing messages to
  • Grab the websocket from the environ
  • Relay messages from the zmq socket to the websocket

Client-side

So the server-side stuff is straightforward; what about the client-side? Well, it's pretty easy as well. Here's the (static) HTML page we'll use:

<html> 
    <head> 
        <title>ZWS Example</title> 
        <script type="text/javascript" src="/flot/jquery.min.js"></script> 
        <script type="text/javascript" src="/flot/jquery.flot.min.js"></script> 
        <script type="text/javascript" src="graph.js"></script> 
    </head> 
    <body> 
        <h1>ZMQ - WebSocket Example</h1> 
        <div id="conn_status">Not Connected</div> 
        <div id="placeholder" style="width:600px;height:300px;"></div> 
    </body> 
</html> 

Here, all we're doing is pulling in the JQuery and Flot libraries as well as our custom graph.js and setting up a couple of placeholders. The Javascript is also pretty straightforward. I've tried to provide inline commentary:

$(function() { 
    // Open up a connection to our server 
    var ws = new WebSocket("ws://localhost:9999/"); 
    // Save our plot placeholder 
    var $placeholder = $('#placeholder'); 
    // Maximum # of data points to plot 
    var datalen = 100; 
    // This will be the plot object 
    var plot = null; 
    // Set up some options on our data series 
    var series = { 
        label: "Value", 
        lines: { 
            show: true, 
            fill: true 
        }, 
        points: { 
            show:true 
        }, 
        data: [] 
    }; 
    // What do we do when we get a message? 
    ws.onmessage = function(evt) { 
        var d = $.parseJSON(evt.data); 
        series.data.push([d.x, d.y]); 
        // Keep the data series a manageable length 
        while (series.data.length > datalen) { 
            series.data.shift(); 
        } 
        if(plot) { 
            // Create the plot if it's not there already 
            plot.setData([series]); 
            plot.setupGrid(); 
            plot.draw(); 
        } else if(series.data.length > 10) { 
            // Update the plot 
            plot = $.plot($placeholder, [series], { 
                xaxis:{ 
                    mode: "time", 
                    timeformat: "%H:%M:%S", 
                    minTickSize: [2, "second"], 
                }, 
                yaxis: { 
                    min: 0, 
                    max: 5 
                } 
            }); 
            plot.draw(); 
        } 
    } 
    // Just update our conn_status field with the connection status 
    ws.onopen = function(evt) { 
        $('#conn_status').html('<b>Connected</b>'); 
    } 
    ws.onerror = function(evt) { 
        $('#conn_status').html('<b>Error</b>'); 
    } 
    ws.onclose = function(evt) { 
        $('#conn_status').html('<b>Closed</b>'); 
    } 
}); 

Putting it all together

To put everything together, here's the main function I used:

def main(): 
    '''Set up zmq context and greenlets for all the servers, then launch the web 
    browser and run the data producer''' 
    context = zmq.Context() 
 
    # zeromq: tcp to inproc gateway 
    gevent.spawn(zmq_server, context) 
    # websocket server: copies inproc zmq messages to websocket 
    ws_server = gevent.pywsgi.WSGIServer( 
        ('', 9999), WebSocketApp(context), 
        handler_class=WebSocketHandler) 
    # http server: serves up static files 
    http_server = gevent.pywsgi.WSGIServer( 
        ('', 8000), 
        paste.urlparser.StaticURLParser(os.path.dirname(__file__))) 
    # Start the server greenlets 
    http_server.start() 
    ws_server.start() 
    # Open a couple of webbrowsers 
    webbrowser.open('http://localhost:8000/graph.html') 
    webbrowser.open('http://localhost:8000/graph.html') 
    # Kick off the producer 
    zmq_producer(context) 

For fun, I threw a couple of webbrowser calls at the end so you can see the data getting distributed to all the clients that connect to our server. If you'd like to see the full program including flot et. al., here's a download. Hope you enjoy playing around with gevent, ZeroMQ, and WebSockets as much as I did!

18 comments:

  1. I should point out that this was tested in Chrome only; I know that Firefox and Opera disabled WebSockets. YMMV.

    ReplyDelete
  2. As somebody that has been fighting WebSockets for the past few months, I'm curious: Have you investigated other long-polling options, like Comet, Athena, Minerva, etc.?

    ReplyDelete
  3. Klaus Lönze4:45 AM

    You prolly know this one already, but I recommend http://socket.io/ for a cross-browser websocket implementation.

    ReplyDelete
  4. Nice integration.

    I'm looking for the similar integration with RabbitMQ for a while, But I didn't got one. Any way your post helped me to get an Idea about 0MQ features. The socket handling was very nice, RabbitMQ missing that part I think ...:(

    If you have any Idea about RabbiMQ please share.

    Thank you,
    Haridas N.

    ReplyDelete
  5. Thanks for the comments!

    @CD Simpson: I did very little investigation of the other long-polling options. I will probably be looking into socket.io that @Klaus Lönze mentioned (there's a gevent-socketio package) and doing another post. I believe that socketio will do all sorts of graceful degradation even down to flash based on your browser.

    @Hridas: I've done some things with RabbitMQ for our work on Allura (http://sf.net/p/allura). RabbitMQ is nice, but as I mentioned, it requires a broker and it's quite a bit more complex to get set up. I believe there are some browser-side integrations with RabbitMQ (google for Stomp RabbitMQ), but I've never used them.

    ReplyDelete
  6. By the way, I've put the working code up on my SourceForge homepage http://sf.net/u/rick446/gevent-socketio/ if you'd like to clone it from there. The latest version uses socket.io instead of "raw" websockets for more portability. I'll try to write that up soon.

    ReplyDelete
    Replies
    1. Anonymous7:05 PM

      Hi Rick.

      The websocket example above works fine, but when I try to run the socketio example I get an KeyError: 'socketio' on line 61 ("socketio = environ['socketio']"). This happens on both the Windows and the Ubuntu platform. Can you please help me resolve this issue since I have no clue how to fix it.

      Delete
    2. My guess is that somehow you're running either a) with different versions of packages that I used, or b) without the correct WSGI server. I am planning on a new blog post covering the latest version of socket.io in the near future, so stay tuned!

      Delete
  7. Why do we need zmq_server() to retransmit the messages. Can't we just connect the socket in WebSocketApp to tcp://*:5000?

    ReplyDelete
    Replies
    1. Thanks for the question! Honestly, it's been several months since I looked at this code and I don't know why I did that, either. The WebSocketApp *should* be able to directly subscribe to port 5000, I think.

      Delete
    2. I can tell you why you did that, having used this article as a base for my own work.

      If you attempt multiple zmq connections to the same TCP endpoint within the same process, they will fail with an "address already in use" error.

      So this must be why you stuck the "inproc://queue" hop in there. I tried to take it out and hit the problem I just mentioned, so have reinstated it and now everything works like a charm :)

      Thanks for the article. Very useful.

      Delete
    3. Thanks for taking the time to work through this, and thanks for the comment!

      Delete
    4. But server can be removed if producer publishes directly to inproc:

      Delete
    5. IvanZ - thanks for the comment. One of the reasons I had the producer going over the network, if I recall correctly, was to imagine how one would 'scale out' such a solution to run on multiple servers. In this case, it's certainly a case of over-architecting the solution, however.

      Delete
  8. Anonymous8:06 AM

    Hi Rick,

    Thanks for the post.
    How can I get the session ID of a connection (user) when the connection established to the web socket, from JS code?

    ReplyDelete
    Replies
    1. Hi Anonymous ;-)

      Thanks for the comment. One approach to getting a session ID is having the server send a user id to the browser on connect (or vice-versa) and storing it. So you'd have a 'sessionid' message or something like that which you would emit() from the server, listening for that event in JS.

      You might also want to check out http://blog.pythonisito.com/2012/07/realtime-web-chat-with-socketio-and.html for a tutorial on a more-recent version of socket.io.

      Thanks again!

      Delete
  9. > you'll need to have libzmq-dev 2.1+

    This is no longer true for pyzmq 2.2. See http://pypi.python.org/pypi/pyzmq-static/2.2

    ReplyDelete
    Replies
    1. Great news! Thanks for sharing this!

      Delete