MAUSCelery » History » Revision 20
« Previous |
Revision 20/31
(diff)
| Next »
Jackson, Mike, 02 February 2012 17:01
Celery configuration and monitoring¶
- Table of contents
- Celery configuration and monitoring
MAUS can be used with the Celery asynchronous distributed task queue to allow transform steps to be executed on multiple processors.
For full information on using Celery, see http://celeryproject.org/ and:
Installation¶
Celery is automatically downloaded and installed when you build MAUS.
Configure a host to run a Celery worker¶
To configure a host to run a Celery worker:
- Ensure MAUS is available on the host you want to use as a Celery worker.
- Ensure you have run:
$ source env.sh
- If you have deployed RabbitMQ on a remote host,
- Edit
src/common_py/mauscelery/celeryconfig.py
- Change
BROKER_HOST = "localhost"
to specify the full hostname of the host on which RabbitMQ was deployed e.g.BROKER_HOST = "maus.epcc.ed.ac.uk"
- Edit
Start up a Celery worker¶
To start up a Celery worker, run:
$ celeryd -l INFO -n WORKER_ID
where
WORKER_ID
is a unique ID for the worker. You should provide one of these as it helps when monitoring if you are using many workers.
Specify the number of sub-processes¶
The worker will set up a number of sub-processes, depending on the number of CPUs available to your host. You can explicitly set the number of sub-processes via the -c
flag e.g.
$ celeryd -l INFO -n WORKER_ID -c 2
You can specify as many sub-processes as you like but exceeding the number of CPUs available may cause performance to suffer.
Purge queued tasks¶
If any tasks are held by RabbitMQ awaiting despatch to workers then the worker, when started, will immediately start to process these. If you want to purge this queue prior to your worker starting, which can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them, then use the --purge
flag e.g.
$ celeryd -l INFO -n WORKER_ID --purge
See also "Purge queued tasks" below.
Specify the logging level¶
The -l
flag specifies the logging level of the worker. The other options, asides from INFO
, are DEBUG
, WARNING
, ERROR
, CRITICAL
e.g.
$ celeryd -l DEBUG -n WORKER_ID
The default logging format is:
[%(asctime)s: %(levelname)s/%(processName)s] %(message)s
You can specify another format in
src/common_py/mauscelery/celeryconfig.py
using a CELERYD_LOG_FORMAT
variable. See the Python logging
module, http://ask.github.com/celery/configuration.html#logging, for more on logging.
Initialise the worker to execute MAUS tasks¶
By default, Celery workers use the default MAUS configuration and apply the MapPyDoNothing transform. They need to be explicity configured to apply other transformations.
If you are running analyses using a client that uses Go.py then this configuration will be done by Go.py on your behalf.
If you are not using Go.py then you can do this manually (or in your own Python code) using the MAUS-specific commands described below in "Birth the worker transforms".
Monitoring and management¶
Celery provides a number of commands to allow inspection of Celery workers. Many of these can be invoked from the command-line, via the celeryctl
command, or from within Python. For full information, see http://ask.github.com/celery/userguide/monitoring.html
In the following, the names shown e.g. worker1
, worker2
etc. are the worker names specified with the -n
flag when starting the workers using celeryd
.
In the invocations of the Celery inspect
command in Python, a specific Celery worker can be specified e.g.
i = inspect("worker1")
where the worker name is that given to the
-n
flag when celeryd
is invoked to start the worker.
Check live workers¶
Check for live Celery workers that have registered with RabbitMQ and are available for use:
$ celeryctl status worker1: OK worker2: OK 2 nodes online.
$ celeryctl inspect ping <- ping -> worker1: OK pong -> worker2: OK pong
To specify a specific worker, use -d
e.g.:
$ celeryctl inspect -d worker1 ping <- ping -> worker1: OK
From within Python, use:
$ from celery.task.control import inspect $ i = inspect() $ i.ping() {u'worker1': u'pong', u'worker2': u'pong'}
Check worker configuration¶
$ celeryctl inspect stats <- stats -> worker1: OK {u'autoscaler': {}, u'consumer': {u'broker': {u'connect_timeout': 4, u'hostname': u'127.0.0.1', u'insist': False, u'login_method': u'AMQPLAIN', u'port': 5672, u'ssl': False, u'transport': u'amqp', u'transport_options': {}, u'userid': u'maus', u'virtual_host': u'maushost'}, u'prefetch_count': 8}, u'pool': {u'max-concurrency': 2, u'max-tasks-per-child': None, u'processes': [11991, 11992], u'put-guarded-by-semaphore': True, u'timeouts': [None, None]}, u'total': {}} -> worker2: OK {u'autoscaler': {}, u'consumer': {u'broker': {u'connect_timeout': 4, u'hostname': u'maus.epcc.ed.ac.uk', u'insist': False, u'login_method': u'AMQPLAIN', u'port': 5672, u'ssl': False, u'transport': u'amqp', u'transport_options': {}, u'userid': u'maus', u'virtual_host': u'maushost'}, u'prefetch_count': 8}, u'pool': {u'max-concurrency': 2, u'max-tasks-per-child': None, u'processes': [21964, 21965], u'put-guarded-by-semaphore': True, u'timeouts': [None, None]}, u'total': {}}
From within Python, use:
$ from celery.task.control import inspect $ i.stats() {...}
Check registered tasks¶
Check the tasks that each worker can execute.
$ celeryctl inspect registered <- registered -> worker1: OK * celery.backend_cleanup * celery.chord * celery.chord_unlock * celery.ping * mauscelery.maustasks.MausGenericTransformTask -> worker2: OK * celery.backend_cleanup * celery.chord * celery.chord_unlock * celery.ping * mauscelery.maustasks.MausGenericTransformTask
From within Python, use:
$ from celery.task.control import inspect $ i = inspect() $ i.registered() {u'worker1': [u'celery.backend_cleanup', u'celery.chord', u'celery.chord_unlock', u'celery.ping', u'mauscelery.maustasks.MausGenericTransformTask'], u'worker2': [u'celery.backend_cleanup', u'celery.chord', u'celery.chord_unlock', u'celery.ping', u'mauscelery.maustasks.MausGenericTransformTask']}
Check task states¶
Check the tasks submitted for execution to a Celery worker by a client.
Check the tasks currently being executed by the worker:
$ celeryctl inspect active <- active -> worker1: OK * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 13)', u'time_start':1327503329.679438, u'name':u'mauscelery.maustasks.MausGenericTransformTask', u'delivery_info':{u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange':u'celery'}, u'hostname': u'worker1', u'acknowledged': True, u'kwargs':u'{}', u'id': u'7222138d-bb2d-4e1b-ba70-5c0f9e90aa08', u'worker_pid':13059} * {...} ...
Note the
worker_pid
which specifies the process ID of the Celery sub-process executing the task.
Check the tasks received by the worker but awaiting execution:
$ celeryctl inspect reserved <- reserved -> worker1: OK * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 95)', u'time_start': None, u'name': u'mauscelery.maustasks.MausGenericTransformTask', u'delivery_info': {u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange': u'celery'}, u'hostname': u'worker1', u'acknowledged': False, u'kwargs': u'{}', u'id': u'ee1b3a88-58cc-4e26-b77d-4424ec9161d1', u'worker_pid': None} * {...} ...
Note the
worker_pid
which specifies the process ID of the Celery sub-process executing the task is currently None
.
From within Python. use:
$ from celery.task.control import inspect $ i = inspect() $ i.active() {u'worker1': [...], ...} $ i.reserved() {u'worker1': [...], ...}
Get results¶
When a task is submitted to a Celery worker, a unique task ID is generated. Using this ID you can collect the results of a task, providing you have not collected them already, using celeryctl
e.g.
$ celeryctl result 95b3c56e-bea9-433d-aecd-920601c5ad20 {"digits": [{"tdc_counts": 1, "channel_id": {"fiber_number": 106, "tracker_numbe r": 0, "type": "Tracker", "station_number": 1, "plane_number": 2}, "adc_counts": ...
Warning Results can only be collected once. If you do this when running an analysis it may cause problems with your client (which may wait to receive the results that you've already collected. This command should only be used for debugging or trying to establish why things are going wrong. For example, if the Celery worker needs to be terminated and you want to get results from RabbitMQ.
Purge queued tasks¶
To purge tasks currently awaiting dispatch from RabbitMQ. This can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them.
$ celeryctl purge Purged 4 messages from 1 known task queue.
From within Python, use:
$ from celery.task.control import discard_all $ discard_all() 4
Shut down workers¶
All workers can be shut down from within Python via:
$ from celery.task.control import broadcast $ broadcast("shutdown")
Each worker will complete the tasks they are currently processing before shutting down.
Alternatively, you can use the Linux kill
command e.g.:
$ ps -a 12614 pts/6 00:00:02 celeryd 12627 pts/6 00:00:00 celeryd 12628 pts/6 00:00:00 celeryd $ kill -s TERM 12614
The process ID should be that of the main worker process. This will usually have the lowest process ID.
To kill the worker immediately, without waiting for currently processing tasks to complete, use:
$ kill -s KILL 12614
To kill all celeryd
processes, use:
$ ps auxww | grep celeryd | awk '{print $2}' | xargs kill -9
MAUS-specific monitoring and management¶
These actions are supported by Celery worker's running on top of MAUS.
Get worker MAUS configuration¶
Get the current MAUS configuration known to the workers, and the transforms that the workers will execute:
$ from celery.task.control import broadcast $ broadcast("get_maus_configuration", reply=True) [{u'worker1': {u'config_id': 123, u'configuration': u'{}', u'transform': u'MapPyDoNothing'}}]
where:
config_id
- a unique ID for the current worker configuration.configuration
- the worker's MAUS configuration as forwarded by a client.transform
- the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups.
Birth the worker transforms with a MAUS configuration¶
Update a worker with a new MAUS configuration and transforms. Existing transforms have death invoked. The new transforms are created and then birth is invoked upon them with the new configuration.
$ from celery.task.control import broadcast $ broadcast("birth", arguments={"configuration":"""{"TOFconversionFactor":0.01}""", \ "transform":"MapPyDoNothing", \"config_id":123}, reply=True) [{u'maus.epcc.ed.ac.uk': {u'7692': {u'status': u'ok'}, u'7693': {u'status': u'ok'}}}]
where:
config_id
- a unique ID for the worker configuration. For the update to be applied this must be different to that currently held by the workers and returned byget_maus_configuration
above.configuration
- the MAUS configuration.transform
- the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups.
The status returned shows the worker IDs and the sub-process(es) within the worker, along with whether the operation was successfully invoked within the sub-processes.
IF birth
is called twice with the same config_id
then this action does nothing and the status returned is:
[{u'maus.epcc.ed.ac.uk': {}}]
Death the worker transforms¶
Invoke death on the workers' transforms:
$ from celery.task.control import broadcast $ broadcast("death", reply=True) [{u'maus.epcc.ed.ac.uk': {u'7692': {u'status': u'ok'}, u'7693': {u'status': u'ok'}}}]
The status returned shows the worker IDs and the sub-process(es) within the worker, along with whether the operation was successfully invoked within the sub-processes.
If death
is called twice then this action does nothing and the status returned is:
[{u'maus.epcc.ed.ac.uk': {}}]
Get information on worker processes¶
Get a list of the worker node processes and their child processes (child processes are responsible for executing jobs):
$ from celery.task.control import broadcast $ broadcast("get_process_pool", reply=True) [{u'worker1': {u'master_name': u'MainProcess', u'master_pid': 12614, u'pool_pids': [12627, 12628]}}]
The sub-process IDs correspond to those visible in the processes
field of the document returned by the celeryctl inspect stats
command, described in "Check worker configuration" above and to those visible if running the Linux ps -a
command e.g.
$ ps -a 12614 pts/6 00:00:02 celeryd 12627 pts/6 00:00:00 celeryd 12628 pts/6 00:00:00 celeryd
Restart the workers¶
To restart Celery workers, run:
$ from celery.task.control import broadcast $ broadcast("restart_pool", reply=True) [{u'worker1': {u'status': u'ok'}}]
This instructs the Celery worker to terminate its sub-processes and spawn new ones. These new ones will inherit any updated configuration or transforms sent using
birth
above.
In the celeryd
window you might see this:
Consumer: Connection to broker lost. Trying to re-establish the connection... Traceback (most recent call last): ... ... ... error: [Errno 4] Interrupted system call
This can be ignored. If in doubt you can use
get_maus_configuration
above to ensure that the workers have the new configuration.
Note: Celery 2.5 supports a broadcast("pool_restart")
command which has the same intent as the above. At present MAUS uses Celery 2.4.6 which is the latest one available via easy_install
.
Updated by Jackson, Mike over 10 years ago · 20 revisions