Samuel Tardieu @

Responsible workers with ØMQ


I stumbled upon several questions on StackOverflow where people asked about safely interrupting distributed workers communicating through the ØMQ middleware.

Most of the ØMQ examples describing workers pools assume that jobs are pushed to the workers in a round-robin way. The first worker receives a job, the second one receives a job, and so on, then the first worker receives yet another job, the second one… Well, you get the idea. Unfortunately, not all jobs are necessarily created equal, and the workers may be running on computers with different processing capabilities and workloads.

As an example of a different way to do it, I wrote a simple Python broker named that distributes tasks on demand. When a worker is ready to work, it asks for some job to perform, receives one if one is available, does the computation, and sends the answer back. This way, no task should ever be sent to a worker which is busy doing other things, possibly for a long time.

The broker also checks that the answer to a job comes back within a given time-frame. If it does not, it assumes that the worker has crashed or is overwhelmed by other tasks, and sends the job again to another worker. Another parameter may be specified: the number of times to attempt to run each job. If a job description causes workers to raise an exception repeatedly, it may be a good idea to abort it and not try to run it indefinitely. If a job is aborted by the broker, an empty answer will be sent to the client so that it knows that its request could not be completed.

Of course, this sample broker is far from perfect, and many things could be changed for the better:

  • If the broker is restarted, workers will not receive tasks anymore. This can be easily fixed by having the broker reissue their task requests from time to time, which would require using a XREQ ØMQ socket instead of a REQ one to allow out of sequence exchanges.

  • If the broker is restarted, queued requests will be lost. Each request could be accompanied by an unique id generated by the client, and asked about if the answer does not arrive in a given time. This would also give a way to cancel pending requests if the client realizes that it does not need them to be executed anymore.

  • Timeouts and number of retries could be configurable for each request rather than globally.

Nonetheless, it should be enough to answer some questions and show how to do things differently.

A sample worker module is also available in the repository. It provides a Worker class that can be derived from; one must also override one of the process or process_multipart method with a function doing the real work in the child class. The inherited methods will take care of communicating with the broker.

{% include github.html

blog comments powered by Disqus