Feature #704

Distributed task queue

Added by Jackson, Mike over 12 years ago. Updated over 12 years ago.

Jackson, Mike
Online reconstruction
Target version:
Start date:
20 September 2011
Due date:
% Done:


Estimated time:


Component to take inputs and spawn threads, or new processes, to execute chains of map workers. Based on Celery - Python distributed task queue.


Updated by Tunnell, Christopher over 12 years ago

Hint: birth() can be done by looking at the modules that get loaded when a Celery worker-machine initializes.


Updated by Jackson, Mike over 12 years ago

From MAUS SSI Component Design - Online Reconstruction:

  • On input of JSON document, want to spawn distributed map job. One map group job (pipeline of map workers) spawned per second corresponding to their arrival from the input. The same map group job is run in different threads/machines.
    • No need at this time for data streaming. That is, different map workers in the same worker working on different JSON documents concurrently.
  • Use Celery asynchronous distributed task queue in Python,
    • Invoking MapGroup "birth" function during Celery "import modules" step will save development effort.
  • Input goes into the map group.
  • JSON document output is deposited in data store.
  • Useful if map worker instances (and map group jobs) had unique IDs, for logging and debugging purposes.
  • Initial prototype could use a mock wrapper round InputData to sleep then emit a JSON document every second. Could read JSON from directory or do a version that creates random JSON documents.

Updated by Jackson, Mike over 12 years ago

Installed Celery:

$ easy_install celery

Needs RabbitMQ (message broker) install. Installed RedHat version:
$ yum install rabbitmq-server
 rabbitmq-server        noarch        2.2.0-1.el5          epel           890 k
Installing for dependencies:
 erlang                 i386          R12B-5.10.el5        epel            39 M
 unixODBC               i386          2.2.11-7.1           sl-base        832 k

Tested RabbitMQ:
$ sudo su -
$ /sbin/service rabbitmq-server start
Starting rabbitmq-server: SUCCESS
$ /sbin/service rabbitmq-server status
Status of all running nodes...
Node 'rabbit@maus' with Pid 26747: running
$ /sbin/service rabbitmq-server stop
Stopping rabbitmq-server: rabbitmq-server.

Configured for Celery:
$ rabbitmqctl add_user maus suam
Creating user "maus" ...
Error: unable to connect to node rabbit@maus: nodedown
- nodes and their ports on maus: [{rabbitmqctl26922,51651}]
- current node: rabbitmqctl26922@maus
- current node home dir: /var/lib/rabbitmq
- current node cookie hash: PMg5ptghNPyeqPO424Y7uw==

Duh, need to start server,
$ rabbitmqctl add_user maus suam
Creating user "maus" ...
$ rabbitmqctl add_vhost maushost
Creating vhost "maushost" ...
$ rabbitmqctl set_permissions -p maushost maus ".*" ".*" ".*" 
Setting permissions for user "maus" in vhost "maushost" ...

Worked through Celery first steps,

To run Celery worker server (on foreground so can see execution):

$ celeryd --loglevel=INFO

To run multiple workers on one machine (each with 10 threads):
$ celeryd --loglevel=INFO --concurrency=10 -n
$ celeryd --loglevel=INFO --concurrency=10 -n
$ celeryd --loglevel=INFO --concurrency=10 -n

-n sets custom host name - this can be anything e.g. maus.zonk.


Updated by Jackson, Mike over 12 years ago

Chris T, if you could dump any notes you made here that'd be useful, thanks.


Updated by Jackson, Mike over 12 years ago

A question in the use of Celery is how to get the MapPyGroup to the worker. The naive approach is that the caller/client passes the MapPyGroup plus configuration to the worker. The worker then creates the MapPyGroup, initialises it, runs it, and returns the result.

def execute_transform_naive(json_config_doc, transform, spill):
    return transform.process(spill)

The upside is that the workers can be started without the need for any configuration associated with a specific client - they're standalone. The downside is that the transform (MapPyGroup) is initialised every spill. Also, it needs the members of the MapPyGroup to be serializable. Not all of them are e.g. sending the MapPyGroup in bin/ gives:
TypeError: can't pickle SwigPyObject objects

Another approach would be for a specification of the MapPyGroup to be sent to a worker (rather than the actual objects). This would be straightfoward but still incurs the initialize-on-every-spill overhead.

Another is to have, as suggested (the above is just me reasoning my way to the suggestion) the "import modules" feature of Celery (done by the Celery Loader.on_worker_init method) handle MapPyGroup configuration. The question then is how to get the MapPyGroup into the worker. So I have assumed:

  • User writes a Celery task that includes their MapPyGroup.
  • User starts worker(s) with their desired MAUS configuration file and which reads their Celery task.
  • Celery loads in the user's Celery task.
  • MAUS custom Celery loader loads and validates the configuration, invokes birth on the user's task which, in turn, passes this to the MapPyGroup.
  • just submits spills to the worker via the user's task.
  • User is responsible for ensuring that the configuration used for the worker(s) and is consistent.

Commit 671 is a prototype of the above with a custom Celery loader that can load MAUS configuration, invoke birth on a Celery task which then in turn invokes birth on the MapPyGroup:

  • Added easy_install celery to third_party/bash/40python_extras.bash
  • Changed src/common_py/ - renamed control_room_style to multi_process and prototyped implementation of multi_process method.
  • Added src/celery with custom Celery loader for MAUS, Celery configuration and prototype MAUS tasks.
  • Changed configure to add src/celery to Python pathes and to export CELERY_LOADER with class name of MAUS custom Celery loader.
  • Also, added wiki page with Celery/RabbitMQ setup information at Celery Configuration

Updated by Jackson, Mike over 12 years ago

Celery allows custom remote control commands

Commit 672, starts prototype of this with work-in-progress support for updating a worker's MAUS configuration and the transforms it is to apply. Once a worker is running a user could update a worker by:

from celery.task.control import broadcast 
# Send new configuration doc.
broadcast("maus_reconfigure_worker", {"config_doc":"b"}, reply=True)
Out[18]: [{u'': {u'ok': u'maus_worker_reconfigured'}}]

broadcast("maus_set_worker_transforms", {"transforms":"[MAUS.MapPyDoNothing]"}, reply=True)
[{u'': {u'ok': u'maus_worker_transforms'}}]

This when finished would simplyify worker usage. Workers could be created using the standard MAUS configuration. Specific users could configure specific workers with their desired MAUS configuration and transformers all from one client.


Updated by Tunnell, Christopher over 12 years ago

I tossed the code that I could find here:


or view on web:

but I think you have already figured out quite a bit. Are you free for a chat at one point or is it worth trying to play with what you have at the moment? I have some questions and comments is all which I'll toss in the tracker but wanted to make sure I understand what you've done first.


Updated by Tunnell, Christopher over 12 years ago

Random comment: I'm not sure how useful dynamically reconfiguring workers will be. I think the use case is something along the lines of:

1. birth is deterministic (ie. different machines can run birth and get into the same state)
2. at any time after birth(), data can be passed through process() to the worker and this is time independent. ie. it doesn't matter if spill 1 happens then spill 2, or vice versa.
3. at the end, death gets called.

Reconfiguring will unlikly ever happen mid-run (ie. within an hour or so of continous data taking). However, people may restart it every few hours if they want to add new components.


Updated by Jackson, Mike over 12 years ago

So, the user will configure a worker with their MapPyGroup (wrapped in a Celery task) and desired MAUS configuration and then write a client that uses this configuration and invokes this task?

The one issue I had with the above (which motivated me looking at dynamic worker configuration) is that it puts a grreater burden on a user. I didn't imagine that there would be a re-configuration mid-run but the user still has to set up and configure their server workers, then write clients that have a consistent configuration and call their server-side tasks for each run. It also implies that the same client (e.g. could not be used in both single-threaded and Celery-style modes, since in Celery-style mode there would be no point in the user creating the MapPyGroup in their client since they’d be using the one configured on the worker. But, if I’m missing something here please let me know!
It’d be good to talk. I’m at a conference this week in Bonn and am off Monday/Tuesday but am back at the office Wednesday/Thursday. But I can respond to the ticket anytime and I’m happy if you want to try running the stuff now.


Updated by Tunnell, Christopher over 12 years ago

I'm sure that sounds fine. We're running the code sprint at the moment, but at some point it would be nice to just run the code. Did you hear anything about opening ports?

I'll peak at the code. But would be nice call I guess.


Updated by Jackson, Mike over 12 years ago

Celery workers only changed weekly/monthly/yearly. Concern if one worker is missed or not updated, what then?


Updated by Jackson, Mike over 12 years ago

Discussed with Chris T, will look at the notion of dynamic worker reconfiguration.


Updated by Jackson, Mike over 12 years ago

Spent 2 days trying to get this to work and failed. A Celery worker node creates a task pool which runs copies of each task class as processes. (This can be seen if starting a worker where there is a MainProcess and 1+ PoolWorker-N processes)

The broadcast approach above can access the task class in the MainProcess but not the copies used by the processes in the task pool. So there is no way (or no way documented, and I've Googled intensively and checked the source code) to reconfigure the task pool processes with a new MAUS configuration or a new set of mappers to apply :-(

So the original, manual configuration and restart approach is needed after all So, unfortunately, this means that using the single-threaded approach would require different clients than from the multi-threaded one (at least in terms of how the mappers are specified)


Updated by Tunnell, Christopher over 12 years ago

Well, drats. Having different code for driving single-threaded or parallel isn't the end of the world as long as the same components get executed. It can be a future request for later or something :)


Updated by Jackson, Mike over 12 years ago

Yes, the same map components get executed regardless.


Updated by Jackson, Mike over 12 years ago

Cleaned up handling of birth errors in

Tested multi-node configurations of Celery.

  • MAUS software installed on EPCC servers maus, maus2 and daiserver.
  • maus has RabbitMQ broker installed and running.
  • Edited src/mauscelery/ and changed
    BROKER_HOST = "localhost" 
  • to
    BROKER_HOST = "" 
  • Started Celery workers on maus and maus2 using debug mode so task invocations can be seen
    $ celeryd -l DEBUG" 
  • Ran an example client on daiserver
    ./bin/examples/ -type_of_dataflow=multi_process
  • maus and maus2 worker outputs shows that the spills are received and processed :-)

Updated by Jackson, Mike over 12 years ago

Above is commit 679

Updated information on MAUSCeleryConfiguration with how to customise Celery configuration and write a task.


Updated by Jackson, Mike over 12 years ago

Implemented another MAUS task. This creates one instance of each map worker and birth these when the Celery worker is started. It takes a spill and a list of worker names to execute and passes the spill through these. MapPyGroup has a new get_worker_names function to allow to determine the worker names to send to the Celery worker. Celery worker and still need to have the same configuration but at least the task pipelines don't need to be specified on the workers in a Python doc. We can see during live testing if this approach is problematic in any way.

Code changes:

  easy_install celery
  export PYTHONPATH="\$MAUS_ROOT_DIR/src/mauscelery:\$PYTHONPATH" 
  export LD_LIBRARY_PATH="\$MAUS_ROOT_DIR/src/mauscelery:\$LD_LIBRARY_PATH" 
  export CELERY_LOADER="mausloader.CeleryLoader" 
   This is needed to override the default Celery loader
  Package declaration
  Celery configuration - specifies broker host, port, user, password,
  virtual host, modules to be imported by a worker e.g. "maustasks" and
  optional MAUS configuration file.
  Loads MAUS configuration and configures MAUS tasks (any Celery Task
  implementing birth()) with MAUS configuration.
  Celery tasks for common MAUS MapPyGroups.
 multi_process uses Celery tasks and classes.
 Added get_worker_names() which returns a list of worker names.

Commit 693


Updated by Jackson, Mike over 12 years ago

711 Celery task takes optional client ID and spill IDs for logging. uses hostname and PID for client ID and spill count for spill ID"


Updated by Jackson, Mike over 12 years ago

Discussion about and Celery...


CR: line 452: You unpack the MapPyGroup into its component mappers and then run each one individually. Why is this?

As the Celery worker nodes run in separate processes (and eventually on separate machines) only serializable objects can be passed across in remote function invocations. Some of the Map tasks (e.g. the ones that wrap CPP implementations) cannot be serialized. So the names of the Map tasks to be executed are sent to the Celery worker rather than the objects. See foregoing for full rationale.

Ah... so you need to unpack the MapPyGroup to get the names. What if you have nested MapPyGroups?


Raised: #857

Skipping mappers

maustasks line 43 and 58: you hard code to skip MapCppSimulation and MapPyTOFPlots - does this mean that they won't be loaded or executed at all? If so, I think this should be a NotImplemented exception (so that code doesn't silently do weird stuff under obscure circumstances, at least without telling user and giving a line reference/stack trace pointing to the obscurity)

MapCppSimulation takes ages to create which makes edit-test-debug cycles take a while. That was an accidental check in.

MapPyTOFPlot is more problematic as it kicks up a PyROOT canvas which can be confusing to the user. Though I could just change the Celery worker initialisation to force PyROOT batch mode?

The point is if you are checking emails on a machine that is also acting as a celery worker it might just throw up random canvases? Yeah, forcing batch mode sounds sensible.

Addressed - see 717

Configuring Celery workers

mausloader line 96: why do you do conf.has_key("MAUS_CONFIG_FILE") - shouldn't this be "maus_configuration_file" key?

Likewise, - this should be accessible from ConfigurationDefaults maybe?

MJ: is only used by Celery and Celery needs it to be specified via a CELERY_CONFIG_MODULE environment variable or a Celery worker command-line option (celeryd --config=mauscelery.celeryconfig) so I'm not sure of the value of specifying it in ConfigurationDefaults too.

What I'm trying to avoid is "if I'm doing this I use this configuration file, if I'm doing that I use that configuration file". Better to have all the setup information in one place. Also means that e.g. (when we get round to it) we will write all this configuration information to the output stream, be nice to include the celery configuration in that. Maybe means writing the celery config file at runtime or something equally ugly.

I've just done a test and we can use to hold the current content of and specify (using CELERY_CONFIG_MODULE or the --config argument of celeryd) that Celery uses as it's configuration file too.

In Configuration is used to

  • Load the ConfigurationDefaults
  • Add the command-line arguments
  • Get the configuration_file value and add the values in this file.
  • Add the configuration values provided as arguments to as a file handle.

For Celery we can't provide MAUS arguments to a worker via the command-line. So specifying a MAUS_CONFIG_FILE value in was intended to provide an alternative way of doing this. This could just be removed and the user can edit instead of if they want to override defaults.

Could you load the executable as normal, taking command line arguments, then write to a temp file, then use that to load celery workers? Would avoid having "well if you're running in this mode, you can't use command line arguments, if you're running in that mode you can" type issues. (I find that the simpler the documentation, the better the interface).

When using Celery the user has to set each Celery worker process running and also run to feed the spills to these. The Celery workers are started at the command-line using Celery's own "celeryd" executable.

"celeryd" throws an error if it encounters a non-celeryd parameter. similarly throws an error if it encounters a non-MAUS parameter. So, there would be a need to write a wrapper for celeryd which takes both MAUS and celeryd arguments, pulls out the MAUS arguments into the temporary file, then removes the MAUS arguments from whatever in Python holds the command-line arguments before invoking celeryd. This would require care also that MAUS and celeryd avoid the same command-line argument names.

For the configuration, you could just have (if you don't think it's ugly, but might be what Chris was saying) in ConfigurationDefaults:

celery_config_file_contents_string = """ blah = blah
blah l

import tempfile
f = tempfile.tempfile()
# f.close() # don't close because it'll delete the file!  just let python get rid of it when it shuts down

then send 'f.filename' onward? I think Chris wants to be able to do json.dumps(configurationdefaults) and have that be every bit of information possible about how the code was run. (all pseudo-code above)

The entry point for a Celery worker is celeryd which expects to find a Celery configuration, so that will always be loaded before ConfigurationDefaults unless the convoluted wrapper approach I suggested above is adopted.

Ah right... oh well. A convoluted approach is worse than a clear approach in a VCS? Hopefully the revision number alone determines the maus_celery_config...

So currently you

  • initialise the workers
  • hand the mapper names to the workers
  • workers initialise the mappers
  • execute

How do you know what mappers to initialise if you don't know what executable will be run? Do you just initialise every mapper?

Celery is based on the notion of tasks. Though represented by a class, a task is just a function. We have a MAUS task which represents the execution of a set of mappers.

So currently,

  • celeryd is run.
  • Instances of each Celery task are created.
  • When our MAUS task is created, it creates instances of every MAUS mapper and holds references to these.
  • Celery then invokes our custom Celery loader class.
  • Celery loader class reads MAUS configuration.
  • Celery loader class invokes birth on MAUS task which in turn invokes birth on the MAUS mappers.
  • The celery worker is then available for use by

Via this invokes the MAUS Celery task asynchronous "run" function, which passes the request (spill and mapper names) to Celery, which passes it to the RabbitMQ task queue, which, in turn, picks a Celery worker to use.

Can you either initialise on first receiving a spill or make a specific initialisation signal when we start a job?

  • initialise the workers
  • execute
  • hand the mapper names to the workers
  • hand the configuration json string to the workers
  • workers initialise the configuration
  • workers initialise the mappers
  • execute

This was my dynamically-configurable dream solution but I battled to get it to work to no avail. Celery creates a pool of task objects which service requests. These can have their state updated but only via the asynchronous function call method and there's no way of forcing a message (e.g. reinitialise yourself) to be broadcast to every task object in every workers pool.

See foregoing commentary for full details.

Maybe requires a new table (or whatever) in CouchDB/MongoDB that includes the Configuration.

Celery doesn't use CouchDB/MongoDB or has any knowledge of it. That's a separate part of the processing pipeline used by creates an instance of the MAUS Celery task which acts as a client-side proxy. It invokes a function on this and passes it the spill. The return spill is then saved in the database.,

The input-map loop is essentially:

WHILE input spill available:
 READ next spill from input
 SUBMIT spill to Celery.
 FOREACH Celery worker
  IF its completed execution THEN
   GET result spill
   WRITE result spill to database.

The reduce-output loop is essentially (just now):
WHILE spills in database:
 READ spill from database
 EXECUTE reduce-output

Though we need more intelligent processing of spills in the database to dinstinguish reduced from non-reduced ones, new arrivals since the last iteration etc.

There are two different things. You can actually run one without the other. Celery and the intermediary DB. Celery just processes spills but has no idea how they are stored later.

Add a process UUID or equivalent (ack uuid library bug that came up a while back) to the data tree and watch for a change? Master node changes the UUID every time it is rerun... and triggers re-initialisation.

Probably this means the task initialisation should be done in rather than in mausloader? Just sounds pretty painful/wrong to do it the other way. Sorry to interfere.

As it currently stands a user will have to

  • Create their configuration
  • Ensure this configuration is available on all nodes they will run Celery workers on.
  • Ensure it's also available to
  • Start up the worker nodes using celeryd.
  • Run their job, which'll invoke

If they change configuration then they'll have to copy this to the nodes, and manually shutdown and restart the the Celery workers.

The Celery workers know nothing about and do not use it at all. runs in a different process (or even a different machine) to the Celery workers. serves as a client to the Celery worker services. <---> Celery client API <---> RabbitMQ <---> Celery worker <---> MAUS Celery task.

What would be possible is a shell, or Python script/function that would:

  • Take a list of all nodes on which MAUS is installed and Celery is to be run.
  • Connect to each node in turn.
  • scp (or ftp or whatever) the configuration to these nodes.
  • Invoke "celeryd" to start up the Celery workers.
  • And, likewise, to shutdown or to restart the workers.

How do the Celery workers get the spill information off the database? What I'm getting at is I am told there is this problem like we can't communicate anything with worker nodes so we can't force them to reinitialise.

Ah, OK the way I was approaching it was to avoid overloading the MAUS task's asynchronous function, having it both process spills and be responsible for reinitialising a task. I was also trying to ensure that all task thread/proceses were updated simultaneously so they were in synch.

But we do have a communication route all figured out, it's CouchDB (or MongoDB). So clearly we can communicate with the worker nodes, because they can get at the spill information and process it. So what stops us using this communication route to push configuration information to the workers.

Sti[id example, not the way to do it but say the json document for the spill looks like:

 "configuration":<string holding output of json.dumps(configuration)>

Then in map.process() I stick a line like
def process(json_document):
 if json_document["process_id"] != my_process_id:
   my_config = json.loads(json_document["configuration"])
   self.birth( my_config )

Okay, you wouldn't call death() in the middle of process() like that - but in principle this should work right?

Could alternatively just read the new configuration from the database.

It would have to be something akin to the following in the MAUS Celery task's asynchronous "run" function:

def run(self, json_document):
  if json_document["process_id"] != my_process_id:
    my_config = json.loads(json_document["configuration"])
    Invoke death() on current map.
    Create new map.
    Invoke birth( my_config ) on map.
  return map.process(spill)

So I would do it differently, I would have a separate table on the DB that holds this information and then before calling map.process() I check to see if the map needs reinitialising - or something similar.

Now, if you don't mind overloading the single task function as above (so it both processes a spill or forces rebirthing of the mappers) then it should be possible.

While you can force all workers to receive this message (as Celery allows you to name the worker nodes) there doesn't seem to be a way to specify a specific thread/process being run by a single worker node. So there's no way to update all actual Celery task objects that handle requests at the same time, they'd only update on an individual-by-individual basis when they receive a spill that differs in process_id from the current one. If this isn't an issue (since they'll all be guaranteed to be updated before they process their next spill) then this should be possible.

I think it is the correct way to go. There is an issue that after we restart the process everything will be slowed up while the workers re-initialise...

I should also say - prioritise appropriately, I don't think this is an urgent task. But I hope that it will be better this way.


Updated by Jackson, Mike over 12 years ago

Another (final) look at dynamic reconfiguration:

  • When "celeryd -c 1 "is called a Task's init constructor is called twice. broadcast calls can only access the MainProcess task instance.
  • Task invocations via run only access the PoolWorker task instances.
  • says to use TERM to kill processes allowing them to complete work-in-progress tasks first.
  • If a child process is killed via "kill -s TERM PID" then a new PoolWorker is created by Celery.
  • This new PoolWorker has the current state of the MainProcess. So updating the MainProcess can have an effect on all the PoolWorkers if we force them all to restart.


Supports broadcast events:

from celery.task.control import broadcast
broadcast("configure_maus", arguments={"config": 456}, reply=True)
[{u'': {u'ok': u'configured'}}]

broadcast("reset_pool", reply=True)
[{u'': {u'ok': u'reset'}}]

Task has configuration variable with default 1. Running
./bin/examples/ -type_of_dataflow=multi_process

shows this value printed in the Celery terminal. Running
broadcast("configure_maus", arguments={"configuration": 123})

changes the configuration value but running
./bin/examples/ -type_of_dataflow=multi_process

shows the value is still 1. Now if we reset the pool:
broadcast("reset_pool", reply=True)

and then run the client, the configuration variable can now be seen to be 123.

Occasionally a stack trace appears in the Celery window, after a return from the broadcast functions. This sometimes arises when using "kill" on the child processes too:

[2012-01-17 15:03:46,534: ERROR/MainProcess] Consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/celery-2.4.6-py2.7.egg/celery/worker/", line 310, in start
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/celery-2.4.6-py2.7.egg/celery/worker/", line 326, in consume_messages
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/kombu-2.0.0-py2.7.egg/kombu/", line 194, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/kombu-2.0.0-py2.7.egg/kombu/transport/", line 227, in drain_events
    return connection.drain_events(**kwargs)
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/kombu-2.0.0-py2.7.egg/kombu/transport/", line 59, in drain_events
    return self.wait_multi(self.channels.values(), timeout=timeout)
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/kombu-2.0.0-py2.7.egg/kombu/transport/", line 65, in wait_multi
    chanmap.keys(), allowed_methods, timeout=timeout)
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/kombu-2.0.0-py2.7.egg/kombu/transport/", line 124, in _wait_multiple
    channel, method_sig, args, content = read_timeout(timeout)
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/kombu-2.0.0-py2.7.egg/kombu/transport/", line 97, in read_timeout
    return self.method_reader.read_method()
  File "build/bdist.linux-i686/egg/amqplib/client_0_8/", line 221, in read_method
    raise m
error: [Errno 4] Interrupted system call


Updated by Jackson, Mike over 12 years ago

Did various worker reset tests:

Defined simple task as:

    def run(self, workers, spill, client_id = "Unknown", spill_id = 0):
        print "RUN!" 
        print "Sleep..." 
        import time
        print "...awake!" 
        return spill

Run which blocks as the above task sleeps.
Send Celery "broadcast":
from celery.task.control import broadcast

Worker(s) display(s):
[2012-01-19 14:38:53,832: WARNING/PoolWorker-1] RUN!
[2012-01-19 14:38:53,834: WARNING/PoolWorker-1] Sleep...
[2012-01-19 14:38:55,870: WARNING/MainProcess] Got shutdown from remote.
[2012-01-19 14:39:08,836: WARNING/PoolWorker-1] ...awake!

then exit(s). So they wait for the current task to complete.

Try again, but instead of a broadcast use,


Celery throws an error:
[2012-01-19 13:30:23,518: ERROR/MainProcess] Task mauscelery.maustasks.MausGenericTransformTask[24ae9500-b3ef-4d37-b142-9b1923a46fad] raised exception: WorkerLostError('Worker exited prematurely.',)
Traceback (most recent call last):
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/celery-2.4.6-py2.7.egg/celery/concurrency/processes/", line 610, in _join_exited_workers
    raise WorkerLostError("Worker exited prematurely.")
WorkerLostError: Worker exited prematurely.
Traceback (most recent call last):
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/celery-2.4.6-py2.7.egg/celery/concurrency/processes/", line 610, in _join_exited_workers
    raise WorkerLostError("Worker exited prematurely.")
WorkerLostError: Worker exited prematurely. shows the task fails:
TRANSFORM: waiting for transform jobs to complete
 Spill 0 task 24ae9500-b3ef-4d37-b142-9b1923a46fad FAILED 
Exception in worker: Worker exited prematurely. 
Traceback in worker: None 

Same behaviour arises if using "kill -TERM POOLPID" or "kill -HUP POOLPID" or sending a Celery "broadcast":
from celery.task.control import broadcast
broadcast("reset_pool", reply=True)

Instead of "os.kill" the following can be used:
        for pid in["processes"]:
            print "Terminated %s " %pid

With the same behaviour as above.

Replace use of "os.kill" to kill pool worker processes in my "reset_pool" Celery broadcast function with:


# Terminal 1 - Celery. Start a worker.
$ celeryd -c 1 -n worker1
# Terminal 2 - ipython. Broadcast to get the pool info.
$ broadcast("get_process_pool", reply=True)
[{u'worker1': {u'master_name': u'MainProcess',
   u'master_pid': 4444,
   u'pool_pids': [4457]}}]
# Terminal 3 - ps.
$ ps -a
 4444 pts/3    00:00:02 celeryd
 4457 pts/3    00:00:00 celeryd
# Terminal 4 -
$ ./bin/examples/ -type_of_dataflow=multi_process -doc_store_class="docstore.MongoDBDocumentStore.MongoDBDocumentStore" 
# Terminal 1 - Celery
[2012-01-19 14:32:21,708: WARNING/PoolWorker-1] RUN!
[2012-01-19 14:32:21,709: WARNING/PoolWorker-1] Sleep...
# Terminal 2.
broadcast("reset_pool", reply=True)
[2012-01-19 14:32:22,426: WARNING/MainProcess] Resetting pool...
[2012-01-19 14:32:22,427: WARNING/MainProcess] Stopping pool...
# Note how the reset_pool call has blocked in the Celery worker, this
# blocks in the pool.stop() call.
# Terminal 3.
$ ps -a
 4444 pts/3    00:00:02 celeryd
 4457 pts/3    00:00:00 celeryd
# Note how the process IDs have not changed.
# Terminal 1.
[2012-01-19 14:32:36,714: WARNING/PoolWorker-1] ...awake!
[2012-01-19 14:32:36,740: WARNING/MainProcess] Starting pool...
[2012-01-19 14:32:36,802: WARNING/MainProcess] ...reset: {'status': 'ok'}
# Now how now the task has completed reset_pool's pool.stop() call
# unblocks and completes and the pool is started.
# Terminal 2
$ broadcast("get_process_pool", reply=True)
[{u'worker1': {u'master_name': u'MainProcess',
   u'master_pid': 4444,
   u'pool_pids': [4482]}}]
# Note the new pool process ID.
# Terminal 3.
$ ps -a
 4444 pts/3    00:00:02 celeryd
 4482 pts/3    00:00:00 celeryd
# Ditto.
# Terminal 4.
 Spill 0 task a051588c-273c-4b33-b0a4-e097c850488a OK 
# Task completes OK from the perspective of

Try with concurrency = 2:
$ celeryd -c 2 -n worker1

Issueing the "reset_pool" broadcast sometimes gives:
Traceback (most recent call last):
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/celery-2.4.6-py2.7.egg/celery/worker/", line 286, in process_task
    self.loglevel, self.logfile)
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/celery-2.4.6-py2.7.egg/celery/app/task/", line 708, in execute
    request.execute_using_pool(pool, loglevel, logfile)
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/celery-2.4.6-py2.7.egg/celery/worker/", line 359, in execute_using_pool
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/celery-2.4.6-py2.7.egg/celery/concurrency/", line 101, in apply_async
  File "/home/michaelj/maus-bzr/maus/third_party/install/lib/python2.7/site-packages/celery-2.4.6-py2.7.egg/celery/concurrency/processes/", line 789, in apply_async
    assert self._state == RUN

Then the worker doesn't continue processing existing spills and just hangs.

Another attempt is to use pool.shrink to reduce the pool to 0 then pool.grow to grow it again. However if a task is running this throws:

ValueError("Can't shrink pool. All processes busy!",)

but will reduce the pool by all the unused processes so it would require the client to populate the pool. Unhappy with using this.

727 has reset_pool_term (uses os.kill), reset_pool_term_job (uses pool.terminate_job), reset_pool_stopstart (uses pool.stop and pool.start), reset_pool_resize (uses pool.shrink and pool.grow)

So, what to do?

  • Using os.kill or pool.terminate_job work but any currently executing tasks are interupted and return a FAILED state to the client that submitted them.
  • Using pool.stop and pool.start allows current tasks to continue to execute but give curious issues if a worker manages more than one process (e.g. celeryd -c 2) and leaves the worker in an odd state and any problems are not detectable in the client that submitted the tasks, which just waits for its tasks to complete, which they won't. It works fine if there are no currently executing jobs when the request is issued.
  • Use pool.shrink and pool.grow.

The code that does the MAUS-specifc reconfiguration is the same regardless. The decision determines how clients might be coded. Prefer reset_pool_term_job and client is responsible for ensuring all their tasks have completed. It leaves the worker in a reusable state.


Updated by Jackson, Mike over 12 years ago

Tested using a broadcast to change the transform from MapPyDoNothing to MapPyPrint then issuing a broadcast to terminate the pool workers. The new pool workers have the updated transform.

Commit "728": - clean-up and support for workernames in both StringType and UnicodeType form.

TODO - clean up the code and add error handling, update to do send a broadcast with the transform names and the current configuration then to broadcast to reset the task pools.

This means the following, proposed earlier, no longer needs to be handled by a worker/considered:

  • Add process UUID to data tree and detect change.
  • Master node changes UUID every time it's run and this triggers re-init.

The broadcast approach is more efficient as the alternative (via the asynchronous task call) would incur sending the configuration and worker list with every spill to ensure the all the worker nodes (and their processes) are eventually updated.


Updated by Jackson, Mike over 12 years ago

Cleaned up code and added error handling as above. Also,

  • Added wiki pages documenting Celery and RabbitMQ (MAUSCelery, MAUSRabbitMQ) setup, configuration and monitoring info:
  • Added wiki page on error detection and recovery for Celery (MAUSCeleryRabbitMQRecovery) - can be used as basis for control room doc related to this.


After telcon with Chris T, issue with current worker node setup. Issues with the approach as implemented.

  • I ass-umed that not calling death would not be an issue in the approach as implemented. Since the sub-processes are killed and respawned with a new configuration, this is analogous to a CTRL-C to terminate if running the single-threaded
  • But some transforms may do cleaning up of files and the like in death.
  • Also, as mappers are created in the main process and copied to the sub-processes (standard Celery approach) if a mapper creates files then the file references will be shared across all sub-processes.
  • There is no way to call death in the sub-processes since there is no way for them to capture easily when they're about to die (see - there is a solution but I don't understand it that well and it seems a bit hacky).

So approach version 3 to Celery has been implemented. This:

  • Retains broadcasts to ensure all workers receive the configuration and transform names.
  • Main process holds the configuration and transform names but does not create these.
  • When a sub-process starts up it creates the transform it needs and births it with the current configuration. If a sub-process dies and a new one spawned then it will automatically create transforms with the up to date configuration.
  • For birth a broadcast is used to send the new configuration and new transform names. The main process which handles broadcasts then fires requests into its process pool and keeps track of which sub-processes service the request. Once all sub-processes have updated then the main process knows that all sub-processes have updated (from an idea in This may take a few seconds if there is more than one sub-process. If time proves to be a major problem problem then multiple celeryd workers can be used each managing a single sub-process, rather than one managing multiple sub-processes per host.
  • When a sub-process receives a birth call it deaths any existing transform, then creates and births the new ones.
  • A death broadcast event is supported so can death the transforms when it's done with them.
  • There's now no need for the sub-processes to be killed and respawned for new configuration to take effect.

Apologies for the verboseness, I want to capture the design rationale. In one sentence it's "Basically what Chris R proposed but using broadcast to update in one go rather than having to send new configuration alongside a spill with every request".


TODO - make more modular and update the monitoring and error wiki pages with up-to-date examples.


Updated by Jackson, Mike over 12 years ago

739 to 744

A lot of cleaning up of Celery code:

  • Client sends unique configuration ID and workers update if this changes.
  • Modularised and added unit tests for the modules.
  • Added MapPyTestMap dummy mapper for testing MapPyGroup and Celery code (can be configured to return False or throw exceptions during birth, process, death)
  • Simplified error and status information returned from Celery worker. No need for caller to know about sub-process IDs.
  • Added test class. All tests skipped if no Celery workers available. If one or more available then tests dynamic reconfiguration methods. End-to-end versions of the unit tests, basically.

Updated by Jackson, Mike over 12 years ago

Timeout tests with MapCppSimulation (as it takes a while to startup)

import json
from Configuration import Configuration
configuration  = Configuration()
json_config_doc = configuration.getConfigJSON()
json_config_dictionary = json.loads(json_config_doc)
workers = ["MapPyBeamMaker", "MapCppSimulation", "MapCppTrackerDigitization"]
results = broadcast("birth", arguments={"transform":workers, "configuration":json_config_doc, "config_id": 123}, reply=True)
print results

results is [] though the workers update fine so the workers are returning to the client before they should.

Try broadcast timeout argument of 500s,

results = broadcast("birth", arguments={"transform":workers, "configuration":json_config_doc, "config_id": 123}, reply=True, timeout=500)

But now waits for 500s regardless. Add use of limit, which specifies how many workers to await responses from:

results = broadcast("birth", arguments={"transform":workers, "configuration":json_config_doc, "config_id": 124}, reply=True, timeout=500, limit=2)

But need to pick a very high timeout - 500s on my VMs wasn't long enough for 2 Celery workers each with 2 sub-processes. Could use get_maus_configuration to check when configuration has been updated. Or use of broadcast callback argument, to invoke method when worker returns.

Server-side, uses Pool.apply_async which takes no timeout and
AsyncResult.get which does take a timeout (and throws a TimeoutError if exceeded) but if omitted the timeout is None. So no risk of timeout from worker process to sub-process updates.


Updated by Rogers, Chris over 12 years ago

Change the datacard "simulation_geometry_filename" to "Test.dat" and it should speed up simulation initialistion of MapCppSimulation...


Updated by Jackson, Mike over 12 years ago

Thanks, that's much faster.


Updated by Jackson, Mike over 12 years ago

749 - MapPyGroup birth and death use ErrorHandler.

750 - now sets:

    ErrorHandler.DefaultHandler().on_error = 'raise'

when worker sub-processes are spawned.

Together these changes mean more informative messages in the Celery terminal window and also back to for birth/death failures. Also, a setting of 'halt' causes ErrorHandler to kill the Celery worker sub-process and Celery will just spawn a new one, which is messy.

See also concern of #896


Updated by Jackson, Mike over 12 years ago

To 776

  • Added client-Celery worker MAUS version checks.
  • Extended test coverage to 100%.
  • Added ./bin/utilities/ a simple "front-end" to celeryd command-line invocation.

Updated by Jackson, Mike over 12 years ago

Last week's tasks:

Commits up to 790

Distributed task queue #704

  • Removed spill ID from Celery execute_transform function as it doesn't mean much. spill_num, if present, will be in the spill

Updated by Jackson, Mike over 12 years ago

Commits up to 797

  • Changed so can have death-then-birth workers without changing configuration. New approach to handling how sub-processes detect if they've already been updated via sending PIDs from main process." src/ tests/
  • New approach to handling how Celery sub-processes detect if they've already been updated via sending PIDs from main process.

Updated by Jackson, Mike over 12 years ago

  • Status changed from Open to Closed
  • % Done changed from 0 to 100

Updated by Rogers, Chris over 12 years ago

  • Target version changed from Future MAUS release to MAUS-v0.2.0

Also available in: Atom PDF