Continuing in my ZeroMQ series, today I'd like to look at ZeroMQ "devices" and how you can integrate ZeroMQ with Gevent so you can combine the easy networking topologies of ZeroMQ with the cooperative multitasking of ZeroMQ. If you're just getting started with ZeroMQ, you might want to check out the following articles:
And if you want some background on Gevent, you might want to check out that series at the following links:
- Introduction to Gevent
- Gevent, Threads, and Benchmarks
- Gevent and Greenlets
- Greening the Python Standard Library with Gevent
- Building TCP Servers with Gevent
- Building Web Applications with Gevent's WSGI Server
One of the nice aspects of ZeroMQ is that it decouples the communication pattern with the connection pattern of your endpoints. Without ZeroMQ, you'd commonly have a "server" socket which binds to a port and receives requests, while "clients" connect to that socket and send requests. With ZeroMQ, it's perfectly acceptable to have the "server" connect to the "client" to receive requests (and in fact you can have multiple "servers" connected to the same "client" socket).
While this is handy, in some cases you may want to have both client and server relatively dynamic, with both connecting and neither binding to a particular port. For this use case, ZeroMQ provides so-called "devices" that bind a couple of sockets and perform forwarding operations between them to support common communication patterns. The "client" and "server" then both connect to the device. In this section, I'll cover the devices provided by the ZeroMQ library.
The Queue device is responsible for mediating the
pattern. Suppose we have the following request code:
import sys import zmq context = zmq.Context() for x in xrange(10): sock = context.socket(zmq.REQ) sock.connect(sys.argv) print 'REQ is', x, sock.send(str(x)) print 'REP is', sock.recv()
and the following response code:
import sys import zmq context = zmq.Context() while True: sock = context.socket(zmq.REP) sock.connect(sys.argv) x = sock.recv() print 'REQ is', x, reply = 'x-%s' % x sock.send(reply) print 'REP is', reply
To set up a broker that forwards between the two, you need a tiny script:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.ROUTER) s2 = context.socket(zmq.DEALER) s1.bind(sys.argv) s2.bind(sys.argv) zmq.device(zmq.QUEUE, s1, s2)
Note that we've used a couple of new socket types above:
zmq.DEALER. These are similar to
zmq.REQ, respectively, but
they allow us to break the strict "request-response" lockstep of
way they work is the following:
ROUTERsocket receives a message on a particular connection. It adds a message ID to the beginning of the message that identifies the sending
FORWARDERdevice sends the message received on the
DEALERpicks a connection to send the message to, stripping the prefix but noting the message ID and linking it to the
REPsocket handling the message.
DEALERreceives the response message, pairs it up with the message ID it's responding to, and adds the message ID to the beginning of the mssage.
FORWARDERdevice sends the message received on the
ROUTERsocket strips the message ID and sends the message to the
REQsocket that sent the initial request.
By using message IDs as above, we can have multiple messages 'in flight', being handled by various servers. If you'd rather not use the built-in ZeroMQ device, you can build your own fairly simply. The code below shows a device that that uses a pair of threads to relay messages from one socket to another:
import sys import zmq import time context = zmq.Context() s1 = context.socket(zmq.ROUTER) s2 = context.socket(zmq.DEALER) s1.bind(sys.argv) s2.bind(sys.argv) def zeromq_relay(a, b): '''Copy data from zeromq socket a to zeromq socket b''' while True: msg = a.recv() more = a.getsockopt(zmq.RCVMORE) if more: b.send(msg, zmq.SNDMORE) else: b.send(msg) def zmq_queue_device(s1, s2): import threading t1 = threading.Thread(target=zeromq_relay, args=(s1,s2)) t2 = threading.Thread(target=zeromq_relay, args=(s2,s1)) t1.daemon = t2.daemon = True t1.start() t2.start() while True: time.sleep(10) zmq_queue_device(s1, s2)
Though the code above seems to work ok, but as Dan Fairs points out,
ZeroMQ sockets are not thread-safe. So what you really want to do is use
nonblocking IO and
zmq.Poller(), but that's a bit beyond the scope of what I
wanted to cover in this article.
In the same way the
QUEUE device mediates the
REQ/REP pattern, the
device mediates the
PUB/SUB pattern. Since
PUB/SUB doesn't require the
lockstep operation of
REQ/REP, we can use the regular
PUB/SUB sockets in our
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.SUB) s2 = context.socket(zmq.PUB) s1.bind(sys.argv) s2.bind(sys.argv) s1.setsockopt(zmq.SUBSCRIBE, '') zmq.device(zmq.FORWARDER, s1, s2)
Now we can connect one or more publishers to our device's "upstream" port
sys.argv), and one or more subscribers to the device's "downstream" port
sys.argv) to provide a
PUB/SUB broker. Note in particular that we had to
subscribe to all mesages in the device code since we don't know which messages
our downstream sockets are interested in.
If you'd rather filter messages that get fowarded, you can subscribe to some subset of messages. Unfortunately, I'm not aware of any way to forward only messages that the downstream clients have subscribed to using built-in ZeroMQ functionality.
If we want to write our own
FORWARDER device, it's even simpler than the
QUEUE device since it only handles unidirectional communication. Assuming we
zeromq_relay function as defined above, our
FORWARDER device is just
def zmq_forwarder_device(upstream, downstream): zeromq_relay(upstream, downstream)
Similar to the
FORWARDER device, the
STREAMER device just sends upstream
packets downstream, but in this case in support of the
PUSH/PULL pattern rather
PUB/SUB. To make a
STREAMER broker, then, we just need the following
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.PULL) s2 = context.socket(zmq.PUSH) s1.bind(sys.argv) s2.bind(sys.argv) zmq.device(zmq.STREAMER, s1, s2)
And once again, if we wanted to create the device manually, it's just a relay:
def zmq_streaming_device(upstream, downstream): zeromq_relay(upstream, downstream)
Integration with gevent
You may have noticed that the gevent device function doesn't return. If you want to create multiple devices within your Python program, then, you'll need to wrap the devices in threads.
Another approach that you might use if you, like me, prefer
the lightweight threading and asynchronous I/O of gevent, is to use the
gevent-zeromq package available from PyPI:
$ pip install gevent-zeromq
Now we can use a 'green' version of ZeroMQ by just importing from the
gevent-zeromq wrapper in our scripts. A "pusher" script would then look like
import sys import time from gevent_zeromq import zmq context = zmq.Context.instance() sock = context.socket(zmq.PUSH) sock.connect(sys.argv) while True: time.sleep(1) sock.send(sys.argv + ':' + time.ctime())
Simple, right? The reason I throw this in this seemingly-unrelated post is that if you want to use gevent, you can't use the built-in devices. This is because the built-in devices block, and they block in the ZeroMQ C library, not in Python where they could be "greened". So if you want a device with gevent, you'll have to write your own (which as, after all, pretty straightforward).
ZeroMQ devices provide a handy way to stitch together complex routing topologies and allow you to decouple the various components of your architecture. Though the built-in devices are quite simple, they provide insight into how you could build more complex devices yourself to fulfill the role of "broker" in a distributed architecture.
I'd be interested in hearing from you how you are using devices (or whether you
find them useful at all) in ZeroMQ programming. Do you use the built-in devices?
Any devices you write yourself? Do you prefer the multithreaded device approach
I've used here, or using the
zmq.Poller object? Let me know in the comments below!