In a previous post, I provided an introduction to ZeroMQ. Continuing along with ZeroMQ, today I'd like to take a look at how you manage various "socket options" in ZeroMQ, particularly when it comes to flow control. If you've never used ZeroMQ, I recommend reading my previous post first. Once you're caught up, let's get started...
ZeroMQ "fire and forget"
One of the awesome things you may have noticed about ZeroMQ is that you can send a message without waiting for it to be received. In fact, the endpoint that's going to receive the message doesn't even need to be connected, and your application will happily act as if the message is winging its way along. While this is great for easily getting up to speed with ZeroMQ, there are some things you need to be aware of.
ZeroMQ is not magic; merely a close approximation
When you send a message in on a ZeroMQ socket, internally ZeroMQ is storing that message in an in-memory queue. So long as you don't send messages faster than whatever's downstream can read them, all is well. The problem comes when your downstream end can't process them fast enough.
Let's consider the following sender, which will send a short message as quickly as possible:
import sys import time import zmq context = zmq.Context() sock = context.socket(zmq.PUSH) sock.connect(sys.argv[1]) while True: sock.send(sys.argv[1] + ':' + time.ctime())
Now if we just run this without having a corresponding 'puller' socket draining the queue, we'll eat up memory as the 'pusher' just keeps enqueueing messages. On my laptop, for instance, the python process reached 3GB of virtual memory size in two minutes. Of course, in a real system you'll have a pulling socket, but in many cases your 'pulling' socket may not be able to pull has fast as your pusher can push.
High water mark to the rescue
To handle situations like this, ZeroMQ provides a socket option called a high
water mark, accessed as zmq.HWM
. This tells us how many messages we want ZeroMQ
to buffer in RAM before blocking the 'pushing' socket. To set the high water
mark, we just need to use the .setsockopt
method:
... sock = context.socket(zmq.PUSH) sock.setsockopt(zmq.HWM, 1000) sock.connect(sys.argv[1]) while True: sock.send(sys.argv[1] + ':' + time.ctime())
The modified pusher will send 1000 messages and then block, using a maximum of 2.3MB on my laptop. Note that the high water mark must be set before you connect to any clients, as ZeroMQ uses a queue-per-client, and fixes the queue size on connect.
Queueing messages on disk
There is one case in which a sending socket may exceed its high water mark. When
you set the zmq.SWAP
option on a socket, ZeroMQ will use a local swapfile to
store messages that exceed the high water mark. In order to set up an 200KB on-disk
swap file, for instance, we could use the following code:
... sock = context.socket(zmq.PUSH) sock.setsockopt(zmq.HWM, 1000) sock.setsockopt(zmq.SWAP, 200*2**10) sock.connect(sys.argv[1]) while True: sock.send(sys.argv[1] + ':' + time.ctime())
Lingering messages
ZeroMQ is designed to deliver messages as reliably as possible by default. One way it does this is by allowing outgoing messages to 'linger' in their queues even when the socket that sent them has been closed. For instance, suppose we have the following single-message pusher:
import sys import time import zmq context = zmq.Context() sock = context.socket(zmq.PUSH) sock.connect(sys.argv[1]) sock.send(sys.argv[1] + ':' + time.ctime()) print 'Exiting...'
Now if we run this without a corresponding "pull" socket, our program will simply sit there saying it's exiting, but never really exiting. This is because by default, the ZeroMQ communication thread will hang around until all its outgoing messages have been sent even if the socket is closed. To modify this behavior, we can set the zmq.LINGER on the socket, setting a maximum amount of time in milliseconds that the thread will try to send messages after its socket has been closed (the default value of -1 means to linger forever):
... sock = context.socket(zmq.PUSH) sock.setsockopt(zmq.LINGER, 1000) sock.connect(sys.argv[1]) sock.send(sys.argv[1] + ':' + time.ctime()) print 'Exiting...'
Other options
In the previous article, we already saw the zmq.SUBSCRIBE
and zmq.UNSUBSCRIBE
options. There are also a number of other socket options available for use with
setsockopt
. Several of these (zmq.RATE
, zmq.RECOVERY_IVL
,zmq.RECOVERY_IVL_MSEC
,
zmq.MCAST_LOOP
, zmq.RECONNECT_IVL
, and zmq.RECONNECT_IVL_MAX
) have to do
with multicast sockets (zmq.PGM
and zmq.EPGM
), so I won't go into them
here. Others (zmq.SNDBUF
, zmq.RCVBUF
, and zmq.BACKLOG
) have to do with the
underlying OS sockets.
There is one option that's potentially a bit more interesting:
zmq.IDENTITY
. From the ZeroMQ docs on setsockopt:
If the socket has no identity, each run of an application is completely separate from other runs. However, with identity set the socket shall re-use any existing ØMQ infrastructure configured by the previous run(s). Thus the application may receive messages that were sent in the meantime, message queue limits shall be shared with previous run(s) and so on.
So if you create a socket and set its identity, it will pick up all the other settings you've set on it before. Beware of overusing this setting, however, since it's may be removed soon.
Conclusion
In the setsockopt
method, ZeroMQ provides a way to control ZeroMQ sockets at a
lower level than the "fire-and-forget" model at the higher level. Particularly if
you're building a "pipeline" style application, you need to be aware of its flow
control features. (I have particular experience with errors caused by not
understanding the zmq.HWM
option, for example.)
There is, of course, still more to cover here. In particular, I'll cover the use of devices to construct more elaborate network topologies and the use of ZeroMQ with gevent in future articles. I'd also love to hear about other topics you'd like to read about, so let me know in the comments below!
Nice article on the basics again. The zeromq guide does not do great justice to the PUSH/PULL socket types for larger applications, the focus really seems to be on REQ/REP variants and PUB/SUB topologies.
ReplyDeleteIt would be great if you could delve more on the PUSH/PULL variants, applications and performance tuning tips.
Thanks for the comment! I'll be touching on PUSH/PULL devices as well as REQ/REP and PUB/SUB devices in future articles.
DeleteIt's interesting that REQ/REP is a focus of the zeromq guide, since these are particularly prone to problems due to their statefulness (what happens when the REP side crashes while handling a message, for instance?)
Great !!
DeleteAlso I have been thinking a lot about Poller vs Tornados event loop for handling multiple sockets. I would like to know your opinion on them as well !!!
Thanks for the comment -- I don'tactually have an opinion on Tornado vs Poller, not having used either of them, but I'll keep them in mind for the future.
Delete