As part of the work I've been doing on Zarkov I've had the opportunity to play around with a lot of cool technologies, among which are gevent, ZeroMQ, WebSockets, and Flot. It took a while to get to the point where I could actually get things done, but once I was there, things were incredibly simple. In this post I'll show you how to use these three techologies together to build a simple web app with realtime server push data.
Prerequisites
Before you can run the example program, you'll need to install the following Python libraries:
- pyzmq
- gevent
- gevent-zeromq
- gevent-websocket
- paste (for a simple WSGI http static file server)
The easiest way to do this is to:
pip install pyzmq gevent gevent_zeromq paste
You may see messages about missing system libraries; you'll need to have libzmq-dev 2.1+ and libevent-dev installed to build the Python packages.
You'll also need to download Flot JQuery plugin and untar it in your project directory.
Greenlets
One of the things you'll need to understand before running the code is the concept of green threads or greenlets. These behave just like threads except for two caveats:
- they never run concurrently with one another
- they only yield to one another at specified points
What this ends up meaning is that they're very lightweight to create compared to threads and you can actually be a little sloppier with shared data between greenlets since you can deterministically know when a greenlet might yield to another greenlet. The gevent library provides our greenlet support, and makes greenlets even more useful by turning blocking system calls into yield points so that another greenlet can run while one is waiting on data.
ZeroMQ
ZeroMQ sounds like it's a message queue like RabbitMQ, but it's really not. ZeroMQ is a library that provides many message queue-like features, but requires no broker and very little setup. We'll be using ZeroMQ pub/sub sockets to manage communication in this demo. Pub/sub sockets have the nice feature that published messages get distributed to all subscribers currently connected, or if no subscribers are connected, the message simply gets dropped.
ZeroMQ was designed to work with threads rather than greenlets, so we'll also need to use the excellent gevent_zeromq library to "greenletize" ZeroMQ. For this example, we'll write a loop that pushes (x,y) point values out on a zmq.PUB socket:
def zmq_producer(context): '''Produce a nice time series sine wave''' socket = context.socket(zmq.PUB) socket.connect('tcp://127.0.0.1:5000') while True: x = time.time() y = 2.5 * (1 + math.sin(x / 500)) socket.send(json.dumps(dict(x=x, y=y))) gevent.sleep(0.1)
The context
being passed in is created in our main()
function with the call zmq.Context()
. The context is just a place for ZeroMQ to store some global state; you generally only create one per application.
OK, now that we have our producer, let's look at the ZeroMQ server. What we'll do here is just relay messages received on an incoming tcp zmq.SUB
socket and publish them on an outgong inproc zmq.PUB
socket:
def zmq_server(context): '''Funnel messages coming from the external tcp socket to an inproc socket''' sock_incoming = context.socket(zmq.SUB) sock_outgoing = context.socket(zmq.PUB) sock_incoming.bind('tcp://*:5000') sock_outgoing.bind('inproc://queue') sock_incoming.setsockopt(zmq.SUBSCRIBE, "") while True: msg = sock_incoming.recv() sock_outgoing.send(msg)
The only thing to note here is that we have to tell ZeroMQ that we want to subscribe to all messages on the zmq.SUB
socket.
Gevent WSGI Servers
Gevent provides a fast implementation of the WSGI standard in the gevent.pywsgi
module. We'll be using two instances of the server (greenlets are lightweight) in this demo. One will simply serve up static files, while the other one will provide our WebSocket connectivity. Here's the code to set up these two servers:
# websocket server: copies inproc zmq messages to websocket ws_server = gevent.pywsgi.WSGIServer( ('', 9999), WebSocketApp(context), handler_class=WebSocketHandler) # http server: serves up static files http_server = gevent.pywsgi.WSGIServer( ('', 8000), paste.urlparser.StaticURLParser(os.path.dirname(__file__)))
Pretty simple; just specify the bind address for the server, the WSGI app to call, and in the case of the WebSocket server, the WebSocket handler class. The http_server
above is mostly uninteresting, as it just serves up static files via paste. The ws_server has a (slightly) more interesting implementation:
class WebSocketApp(object): '''Funnel messages coming from an inproc zmq socket to the websocket''' def __init__(self, context): self.context = context def __call__(self, environ, start_response): ws = environ['wsgi.websocket'] sock = self.context.socket(zmq.SUB)/span> sock.setsockopt(zmq.SUBSCRIBE, "") sock.connect('inproc://queue') while True: msg = sock.recv() ws.send(msg)
What's happening here is that when we get a connection to the websocket address (port 9999 in our example), we do the following:
- Subscribe to the inproc socket that our zmq_server is publishing messages to
- Grab the websocket from the
environ
- Relay messages from the zmq socket to the websocket
Client-side
So the server-side stuff is straightforward; what about the client-side? Well, it's pretty easy as well. Here's the (static) HTML page we'll use:
<html> <head> <title>ZWS Example</title> <script type="text/javascript" src="/flot/jquery.min.js"></script> <script type="text/javascript" src="/flot/jquery.flot.min.js"></script> <script type="text/javascript" src="graph.js"></script> </head> <body> <h1>ZMQ - WebSocket Example</h1> <div id="conn_status">Not Connected</div> <div id="placeholder" style="width:600px;height:300px;"></div> </body> </html>
Here, all we're doing is pulling in the JQuery and Flot libraries as well as our custom graph.js and setting up a couple of placeholders. The Javascript is also pretty straightforward. I've tried to provide inline commentary:
$(function() { // Open up a connection to our server var ws = new WebSocket("ws://localhost:9999/"); // Save our plot placeholder var $placeholder = $('#placeholder'); // Maximum # of data points to plot var datalen = 100; // This will be the plot object var plot = null; // Set up some options on our data series var series = { label: "Value", lines: { show: true, fill: true }, points: { show:true }, data: [] }; // What do we do when we get a message? ws.onmessage = function(evt) { var d = $.parseJSON(evt.data); series.data.push([d.x, d.y]); // Keep the data series a manageable length while (series.data.length > datalen) { series.data.shift(); } if(plot) { // Create the plot if it's not there already plot.setData([series]); plot.setupGrid(); plot.draw(); } else if(series.data.length > 10) { // Update the plot plot = $.plot($placeholder, [series], { xaxis:{ mode: "time", timeformat: "%H:%M:%S", minTickSize: [2, "second"], }, yaxis: { min: 0, max: 5 } }); plot.draw(); } } // Just update our conn_status field with the connection status ws.onopen = function(evt) { $('#conn_status').html('<b>Connected</b>'); } ws.onerror = function(evt) { $('#conn_status').html('<b>Error</b>'); } ws.onclose = function(evt) { $('#conn_status').html('<b>Closed</b>'); } });
Putting it all together
To put everything together, here's the main function I used:
def main(): '''Set up zmq context and greenlets for all the servers, then launch the web browser and run the data producer''' context = zmq.Context() # zeromq: tcp to inproc gateway gevent.spawn(zmq_server, context) # websocket server: copies inproc zmq messages to websocket ws_server = gevent.pywsgi.WSGIServer( ('', 9999), WebSocketApp(context), handler_class=WebSocketHandler) # http server: serves up static files http_server = gevent.pywsgi.WSGIServer( ('', 8000), paste.urlparser.StaticURLParser(os.path.dirname(__file__))) # Start the server greenlets http_server.start() ws_server.start() # Open a couple of webbrowsers webbrowser.open('http://localhost:8000/graph.html') webbrowser.open('http://localhost:8000/graph.html') # Kick off the producer zmq_producer(context)
For fun, I threw a couple of webbrowser calls at the end so you can see the data getting distributed to all the clients that connect to our server. If you'd like to see the full program including flot et. al., here's a download. Hope you enjoy playing around with gevent, ZeroMQ, and WebSockets as much as I did!
I should point out that this was tested in Chrome only; I know that Firefox and Opera disabled WebSockets. YMMV.
ReplyDeleteAs somebody that has been fighting WebSockets for the past few months, I'm curious: Have you investigated other long-polling options, like Comet, Athena, Minerva, etc.?
ReplyDeleteYou prolly know this one already, but I recommend http://socket.io/ for a cross-browser websocket implementation.
ReplyDeleteNice integration.
ReplyDeleteI'm looking for the similar integration with RabbitMQ for a while, But I didn't got one. Any way your post helped me to get an Idea about 0MQ features. The socket handling was very nice, RabbitMQ missing that part I think ...:(
If you have any Idea about RabbiMQ please share.
Thank you,
Haridas N.
Thanks for the comments!
ReplyDelete@CD Simpson: I did very little investigation of the other long-polling options. I will probably be looking into socket.io that @Klaus Lönze mentioned (there's a gevent-socketio package) and doing another post. I believe that socketio will do all sorts of graceful degradation even down to flash based on your browser.
@Hridas: I've done some things with RabbitMQ for our work on Allura (http://sf.net/p/allura). RabbitMQ is nice, but as I mentioned, it requires a broker and it's quite a bit more complex to get set up. I believe there are some browser-side integrations with RabbitMQ (google for Stomp RabbitMQ), but I've never used them.
By the way, I've put the working code up on my SourceForge homepage http://sf.net/u/rick446/gevent-socketio/ if you'd like to clone it from there. The latest version uses socket.io instead of "raw" websockets for more portability. I'll try to write that up soon.
ReplyDeleteHi Rick.
DeleteThe websocket example above works fine, but when I try to run the socketio example I get an KeyError: 'socketio' on line 61 ("socketio = environ['socketio']"). This happens on both the Windows and the Ubuntu platform. Can you please help me resolve this issue since I have no clue how to fix it.
My guess is that somehow you're running either a) with different versions of packages that I used, or b) without the correct WSGI server. I am planning on a new blog post covering the latest version of socket.io in the near future, so stay tuned!
DeleteWhy do we need zmq_server() to retransmit the messages. Can't we just connect the socket in WebSocketApp to tcp://*:5000?
ReplyDeleteThanks for the question! Honestly, it's been several months since I looked at this code and I don't know why I did that, either. The WebSocketApp *should* be able to directly subscribe to port 5000, I think.
DeleteI can tell you why you did that, having used this article as a base for my own work.
DeleteIf you attempt multiple zmq connections to the same TCP endpoint within the same process, they will fail with an "address already in use" error.
So this must be why you stuck the "inproc://queue" hop in there. I tried to take it out and hit the problem I just mentioned, so have reinstated it and now everything works like a charm :)
Thanks for the article. Very useful.
Thanks for taking the time to work through this, and thanks for the comment!
DeleteBut server can be removed if producer publishes directly to inproc:
DeleteIvanZ - thanks for the comment. One of the reasons I had the producer going over the network, if I recall correctly, was to imagine how one would 'scale out' such a solution to run on multiple servers. In this case, it's certainly a case of over-architecting the solution, however.
DeleteHi Rick,
ReplyDeleteThanks for the post.
How can I get the session ID of a connection (user) when the connection established to the web socket, from JS code?
Hi Anonymous ;-)
DeleteThanks for the comment. One approach to getting a session ID is having the server send a user id to the browser on connect (or vice-versa) and storing it. So you'd have a 'sessionid' message or something like that which you would emit() from the server, listening for that event in JS.
You might also want to check out http://blog.pythonisito.com/2012/07/realtime-web-chat-with-socketio-and.html for a tutorial on a more-recent version of socket.io.
Thanks again!
> you'll need to have libzmq-dev 2.1+
ReplyDeleteThis is no longer true for pyzmq 2.2. See http://pypi.python.org/pypi/pyzmq-static/2.2
Great news! Thanks for sharing this!
Delete