Tuesday, August 28, 2012

Using ZeroMQ devices to support complex network topologies

Continuing in my ZeroMQ series, today I'd like to look at ZeroMQ "devices" and how you can integrate ZeroMQ with Gevent so you can combine the easy networking topologies of ZeroMQ with the cooperative multitasking of ZeroMQ. If you're just getting started with ZeroMQ, you might want to check out the following articles:

And if you want some background on Gevent, you might want to check out that series at the following links:

Once you're caught up, let's get started...

ZeroMQ "devices"

One of the nice aspects of ZeroMQ is that it decouples the communication pattern with the connection pattern of your endpoints. Without ZeroMQ, you'd commonly have a "server" socket which binds to a port and receives requests, while "clients" connect to that socket and send requests. With ZeroMQ, it's perfectly acceptable to have the "server" connect to the "client" to receive requests (and in fact you can have multiple "servers" connected to the same "client" socket).

While this is handy, in some cases you may want to have both client and server relatively dynamic, with both connecting and neither binding to a particular port. For this use case, ZeroMQ provides so-called "devices" that bind a couple of sockets and perform forwarding operations between them to support common communication patterns. The "client" and "server" then both connect to the device. In this section, I'll cover the devices provided by the ZeroMQ library.

Queue device

The Queue device is responsible for mediating the REQ/REP communication pattern. Suppose we have the following request code:

import sys
import zmq

context = zmq.Context()

for x in xrange(10):
    sock = context.socket(zmq.REQ)
    sock.connect(sys.argv[1])
    print 'REQ is', x,
    sock.send(str(x))
    print 'REP is', sock.recv()

and the following response code:

import sys
import zmq

context = zmq.Context()

while True:
    sock = context.socket(zmq.REP)
    sock.connect(sys.argv[1])
    x = sock.recv()
    print 'REQ is', x,
    reply = 'x-%s' % x
    sock.send(reply)
    print 'REP is', reply

To set up a broker that forwards between the two, you need a tiny script:

import sys
import zmq

context = zmq.Context()

s1 = context.socket(zmq.ROUTER)
s2 = context.socket(zmq.DEALER)
s1.bind(sys.argv[1])
s2.bind(sys.argv[2])
zmq.device(zmq.QUEUE, s1, s2)

Note that we've used a couple of new socket types above: zmq.ROUTER and zmq.DEALER. These are similar to zmq.REP and zmq.REQ, respectively, but they allow us to break the strict "request-response" lockstep of REQ/REP. The way they work is the following:

  1. The ROUTER socket receives a message on a particular connection. It adds a message ID to the beginning of the message that identifies the sending REQ socket.
  2. The FORWARDER device sends the message received on the ROUTER to the DEALER socket.
  3. The DEALER picks a connection to send the message to, stripping the prefix but noting the message ID and linking it to the REP socket handling the message.
  4. The DEALER receives the response message, pairs it up with the message ID it's responding to, and adds the message ID to the beginning of the mssage.
  5. The FORWARDER device sends the message received on the DEALER to the ROUTER socket.
  6. The ROUTER socket strips the message ID and sends the message to the REQ socket that sent the initial request.

By using message IDs as above, we can have multiple messages 'in flight', being handled by various servers. If you'd rather not use the built-in ZeroMQ device, you can build your own fairly simply. The code below shows a device that that uses a pair of threads to relay messages from one socket to another:

import sys
import zmq
import time

context = zmq.Context()

s1 = context.socket(zmq.ROUTER)
s2 = context.socket(zmq.DEALER)
s1.bind(sys.argv[1])
s2.bind(sys.argv[2])

def zeromq_relay(a, b):
    '''Copy data from zeromq socket a to zeromq socket b'''
    while True:
        msg = a.recv()
        more = a.getsockopt(zmq.RCVMORE)
        if more:
            b.send(msg, zmq.SNDMORE)
        else:
            b.send(msg)

def zmq_queue_device(s1, s2):
    import threading
    t1 = threading.Thread(target=zeromq_relay, args=(s1,s2))
    t2 = threading.Thread(target=zeromq_relay, args=(s2,s1))
    t1.daemon = t2.daemon = True
    t1.start()
    t2.start()
    while True:
        time.sleep(10)

zmq_queue_device(s1, s2)

WARNING Though the code above seems to work ok, but as Dan Fairs points out, ZeroMQ sockets are not thread-safe. So what you really want to do is use nonblocking IO and zmq.Poller(), but that's a bit beyond the scope of what I wanted to cover in this article.


Forwarding device

In the same way the QUEUE device mediates the REQ/REP pattern, the FORWARDER device mediates the PUB/SUB pattern. Since PUB/SUB doesn't require the lockstep operation of REQ/REP, we can use the regular PUB/SUB sockets in our device:

import sys
import zmq
context = zmq.Context()

s1 = context.socket(zmq.SUB)
s2 = context.socket(zmq.PUB)

s1.bind(sys.argv[1])
s2.bind(sys.argv[2])

s1.setsockopt(zmq.SUBSCRIBE, '')

zmq.device(zmq.FORWARDER, s1, s2)

Now we can connect one or more publishers to our device's "upstream" port (sys.argv[1]), and one or more subscribers to the device's "downstream" port (sys.argv[2]) to provide a PUB/SUB broker. Note in particular that we had to subscribe to all mesages in the device code since we don't know which messages our downstream sockets are interested in.

If you'd rather filter messages that get fowarded, you can subscribe to some subset of messages. Unfortunately, I'm not aware of any way to forward only messages that the downstream clients have subscribed to using built-in ZeroMQ functionality.

If we want to write our own FORWARDER device, it's even simpler than the QUEUE device since it only handles unidirectional communication. Assuming we have the zeromq_relay function as defined above, our FORWARDER device is just the following:

def zmq_forwarder_device(upstream, downstream):
    zeromq_relay(upstream, downstream)

Streaming device

Similar to the FORWARDER device, the STREAMER device just sends upstream packets downstream, but in this case in support of the PUSH/PULL pattern rather than PUB/SUB. To make a STREAMER broker, then, we just need the following code:

import sys
import zmq
context = zmq.Context()

s1 = context.socket(zmq.PULL)
s2 = context.socket(zmq.PUSH)

s1.bind(sys.argv[1])
s2.bind(sys.argv[2])

zmq.device(zmq.STREAMER, s1, s2)

And once again, if we wanted to create the device manually, it's just a relay:

def zmq_streaming_device(upstream, downstream):
    zeromq_relay(upstream, downstream)

Integration with gevent

You may have noticed that the gevent device function doesn't return. If you want to create multiple devices within your Python program, then, you'll need to wrap the devices in threads.

Another approach that you might use if you, like me, prefer the lightweight threading and asynchronous I/O of gevent, is to use the gevent-zeromq package available from PyPI:

$ pip install gevent-zeromq

Now we can use a 'green' version of ZeroMQ by just importing from the gevent-zeromq wrapper in our scripts. A "pusher" script would then look like this:

import sys
import time

from gevent_zeromq import zmq

context = zmq.Context.instance()
sock = context.socket(zmq.PUSH)
sock.connect(sys.argv[1])

while True:
    time.sleep(1)
    sock.send(sys.argv[1] + ':' + time.ctime())

Simple, right? The reason I throw this in this seemingly-unrelated post is that if you want to use gevent, you can't use the built-in devices. This is because the built-in devices block, and they block in the ZeroMQ C library, not in Python where they could be "greened". So if you want a device with gevent, you'll have to write your own (which as, after all, pretty straightforward).

Conclusion

ZeroMQ devices provide a handy way to stitch together complex routing topologies and allow you to decouple the various components of your architecture. Though the built-in devices are quite simple, they provide insight into how you could build more complex devices yourself to fulfill the role of "broker" in a distributed architecture.

I'd be interested in hearing from you how you are using devices (or whether you find them useful at all) in ZeroMQ programming. Do you use the built-in devices? Any devices you write yourself? Do you prefer the multithreaded device approach I've used here, or using the zmq.Poller object? Let me know in the comments below!

6 comments:

  1. "...combine the easy networking topologies of ZeroMQ with the cooperative multitasking of ZeroMQ"


    That should probably be the "cooperative multitasking of gevent", right?

    ReplyDelete
    Replies
    1. You are absolutely correct. Thanks for catching that for me! I've corrected the article.

      Delete
  2. Hi there!

    I think your example using threads is a little dangerous, as you're sharing sockets between threads. This is unsupported. See the section on 'thread safety' here:

    http://api.zeromq.org/2-1:zmq-socket

    Otherwise, thanks for a great little article!

    ReplyDelete
    Replies
    1. Dan,

      Thanks for the comment! I had not realized that zmq sockets were not thread-safe. My guess is that I've only not seen crashing because I'd only send in one thread and recv in another, but it's certainly a bad approach. (Of course, with gevent_zeromq, the problem goes away since everything is implemented in a single thread).

      Great catch!

      Delete
  3. Anonymous1:38 AM

    Any exampe for using context in multiprocess.

    ReplyDelete
    Replies
    1. Unfortunately I don't have an example handy right now, but if I recall correctly, things work best if you create the context after forking your subprocesses. (And if you're talking about the multiprocessing library in Python in particular, I don't have any experience making that work with 0MQ.)

      Delete