Continuing in my ZeroMQ series, today I'd like to look at ZeroMQ "devices" and how you can integrate ZeroMQ with Gevent so you can combine the easy networking topologies of ZeroMQ with the cooperative multitasking of ZeroMQ. If you're just getting started with ZeroMQ, you might want to check out the following articles:
And if you want some background on Gevent, you might want to check out that series at the following links:
- Introduction to Gevent
- Gevent, Threads, and Benchmarks
- Gevent and Greenlets
- Greening the Python Standard Library with Gevent
- Building TCP Servers with Gevent
- Building Web Applications with Gevent's WSGI Server
Once you're caught up, let's get started...
ZeroMQ "devices"
One of the nice aspects of ZeroMQ is that it decouples the communication pattern with the connection pattern of your endpoints. Without ZeroMQ, you'd commonly have a "server" socket which binds to a port and receives requests, while "clients" connect to that socket and send requests. With ZeroMQ, it's perfectly acceptable to have the "server" connect to the "client" to receive requests (and in fact you can have multiple "servers" connected to the same "client" socket).
While this is handy, in some cases you may want to have both client and server relatively dynamic, with both connecting and neither binding to a particular port. For this use case, ZeroMQ provides so-called "devices" that bind a couple of sockets and perform forwarding operations between them to support common communication patterns. The "client" and "server" then both connect to the device. In this section, I'll cover the devices provided by the ZeroMQ library.
Queue device
The Queue device is responsible for mediating the REQ/REP
communication
pattern. Suppose we have the following request code:
import sys import zmq context = zmq.Context() for x in xrange(10): sock = context.socket(zmq.REQ) sock.connect(sys.argv[1]) print 'REQ is', x, sock.send(str(x)) print 'REP is', sock.recv()
and the following response code:
import sys import zmq context = zmq.Context() while True: sock = context.socket(zmq.REP) sock.connect(sys.argv[1]) x = sock.recv() print 'REQ is', x, reply = 'x-%s' % x sock.send(reply) print 'REP is', reply
To set up a broker that forwards between the two, you need a tiny script:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.ROUTER) s2 = context.socket(zmq.DEALER) s1.bind(sys.argv[1]) s2.bind(sys.argv[2]) zmq.device(zmq.QUEUE, s1, s2)
Note that we've used a couple of new socket types above: zmq.ROUTER
and
zmq.DEALER
. These are similar to zmq.REP
and zmq.REQ
, respectively, but
they allow us to break the strict "request-response" lockstep of REQ/REP
. The
way they work is the following:
- The
ROUTER
socket receives a message on a particular connection. It adds a message ID to the beginning of the message that identifies the sendingREQ
socket. - The
FORWARDER
device sends the message received on theROUTER
to theDEALER
socket. - The
DEALER
picks a connection to send the message to, stripping the prefix but noting the message ID and linking it to theREP
socket handling the message. - The
DEALER
receives the response message, pairs it up with the message ID it's responding to, and adds the message ID to the beginning of the mssage. - The
FORWARDER
device sends the message received on theDEALER
to theROUTER
socket. - The
ROUTER
socket strips the message ID and sends the message to theREQ
socket that sent the initial request.
By using message IDs as above, we can have multiple messages 'in flight', being handled by various servers. If you'd rather not use the built-in ZeroMQ device, you can build your own fairly simply. The code below shows a device that that uses a pair of threads to relay messages from one socket to another:
import sys import zmq import time context = zmq.Context() s1 = context.socket(zmq.ROUTER) s2 = context.socket(zmq.DEALER) s1.bind(sys.argv[1]) s2.bind(sys.argv[2]) def zeromq_relay(a, b): '''Copy data from zeromq socket a to zeromq socket b''' while True: msg = a.recv() more = a.getsockopt(zmq.RCVMORE) if more: b.send(msg, zmq.SNDMORE) else: b.send(msg) def zmq_queue_device(s1, s2): import threading t1 = threading.Thread(target=zeromq_relay, args=(s1,s2)) t2 = threading.Thread(target=zeromq_relay, args=(s2,s1)) t1.daemon = t2.daemon = True t1.start() t2.start() while True: time.sleep(10) zmq_queue_device(s1, s2)
WARNING
Though the code above seems to work ok, but as Dan Fairs points out,
ZeroMQ sockets are not thread-safe. So what you really want to do is use
nonblocking IO and zmq.Poller()
, but that's a bit beyond the scope of what I
wanted to cover in this article.
Forwarding device
In the same way the QUEUE
device mediates the REQ/REP
pattern, the FORWARDER
device mediates the PUB/SUB
pattern. Since PUB/SUB
doesn't require the
lockstep operation of REQ/REP
, we can use the regular PUB/SUB
sockets in our
device:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.SUB) s2 = context.socket(zmq.PUB) s1.bind(sys.argv[1]) s2.bind(sys.argv[2]) s1.setsockopt(zmq.SUBSCRIBE, '') zmq.device(zmq.FORWARDER, s1, s2)
Now we can connect one or more publishers to our device's "upstream" port
(sys.argv[1]
), and one or more subscribers to the device's "downstream" port
(sys.argv[2]
) to provide a PUB/SUB
broker. Note in particular that we had to
subscribe to all mesages in the device code since we don't know which messages
our downstream sockets are interested in.
If you'd rather filter messages that get fowarded, you can subscribe to some subset of messages. Unfortunately, I'm not aware of any way to forward only messages that the downstream clients have subscribed to using built-in ZeroMQ functionality.
If we want to write our own FORWARDER
device, it's even simpler than the
QUEUE
device since it only handles unidirectional communication. Assuming we
have the zeromq_relay
function as defined above, our FORWARDER
device is just
the following:
def zmq_forwarder_device(upstream, downstream): zeromq_relay(upstream, downstream)
Streaming device
Similar to the FORWARDER
device, the STREAMER
device just sends upstream
packets downstream, but in this case in support of the PUSH/PULL
pattern rather
than PUB/SUB
. To make a STREAMER
broker, then, we just need the following
code:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.PULL) s2 = context.socket(zmq.PUSH) s1.bind(sys.argv[1]) s2.bind(sys.argv[2]) zmq.device(zmq.STREAMER, s1, s2)
And once again, if we wanted to create the device manually, it's just a relay:
def zmq_streaming_device(upstream, downstream): zeromq_relay(upstream, downstream)
Integration with gevent
You may have noticed that the gevent device function doesn't return. If you want to create multiple devices within your Python program, then, you'll need to wrap the devices in threads.
Another approach that you might use if you, like me, prefer
the lightweight threading and asynchronous I/O of gevent, is to use the
gevent-zeromq
package available from PyPI:
$ pip install gevent-zeromq
Now we can use a 'green' version of ZeroMQ by just importing from the
gevent-zeromq
wrapper in our scripts. A "pusher" script would then look like
this:
import sys import time from gevent_zeromq import zmq context = zmq.Context.instance() sock = context.socket(zmq.PUSH) sock.connect(sys.argv[1]) while True: time.sleep(1) sock.send(sys.argv[1] + ':' + time.ctime())
Simple, right? The reason I throw this in this seemingly-unrelated post is that if you want to use gevent, you can't use the built-in devices. This is because the built-in devices block, and they block in the ZeroMQ C library, not in Python where they could be "greened". So if you want a device with gevent, you'll have to write your own (which as, after all, pretty straightforward).
Conclusion
ZeroMQ devices provide a handy way to stitch together complex routing topologies and allow you to decouple the various components of your architecture. Though the built-in devices are quite simple, they provide insight into how you could build more complex devices yourself to fulfill the role of "broker" in a distributed architecture.
I'd be interested in hearing from you how you are using devices (or whether you
find them useful at all) in ZeroMQ programming. Do you use the built-in devices?
Any devices you write yourself? Do you prefer the multithreaded device approach
I've used here, or using the zmq.Poller
object? Let me know in the comments below!
"...combine the easy networking topologies of ZeroMQ with the cooperative multitasking of ZeroMQ"
ReplyDeleteThat should probably be the "cooperative multitasking of gevent", right?
You are absolutely correct. Thanks for catching that for me! I've corrected the article.
DeleteHi there!
ReplyDeleteI think your example using threads is a little dangerous, as you're sharing sockets between threads. This is unsupported. See the section on 'thread safety' here:
http://api.zeromq.org/2-1:zmq-socket
Otherwise, thanks for a great little article!
Dan,
DeleteThanks for the comment! I had not realized that zmq sockets were not thread-safe. My guess is that I've only not seen crashing because I'd only send in one thread and recv in another, but it's certainly a bad approach. (Of course, with gevent_zeromq, the problem goes away since everything is implemented in a single thread).
Great catch!
Any exampe for using context in multiprocess.
ReplyDeleteUnfortunately I don't have an example handy right now, but if I recall correctly, things work best if you create the context after forking your subprocesses. (And if you're talking about the multiprocessing library in Python in particular, I don't have any experience making that work with 0MQ.)
Delete