Project

General

Profile

MAUSCelery » History » Revision 30

Revision 29 (Jackson, Mike, 15 March 2012 17:11) → Revision 30/31 (Jackson, Mike, 15 March 2012 17:11)

h1. Celery configuration and monitoring 

 {{>toc}} 

 MAUS can be used with the Celery asynchronous distributed task queue (http://celeryproject.org/) to allow transform (map) steps to be executed on multiple processors in parallel. 

 For full information on using Celery, see the Celery web site and: 

 * "Introduction":http://docs.celeryproject.org/en/latest/getting-started/introduction.html 
 * "Help resources":http://ask.github.com/celery/getting-started/resources.html 
 * "User doc":http://ask.github.com/celery/index.html 
 * "Source code":https://github.com/ask/celery/tree/master/celery 

 h2. Configure a node to run a Celery worker 

 To configure a node to run a Celery worker: 

 * Ensure MAUS is available on the node you want to use as a Celery worker. 
 ** *Important note* MAUS sends configuration information to Celery workers. This can include absolute paths. Therefore, when deploying workers please ensure that the *path to the MAUS deployment used by the Celery worker* is the *same* as the *path to the MAUS deployment within which you are running MAUS data analysis workflows*. This is a known issue - #918 
 * Run: 
 <pre> 
 $ source env.sh 
 </pre> 
 * If you have deployed RabbitMQ on a remote host, 
 ** Edit @src/common_py/mauscelery/celeryconfig.py@ 
 ** Change 
 <pre> 
 BROKER_HOST = "localhost" 
 </pre> 
 to specify the full hostname of the host on which RabbitMQ was deployed e.g. 
 <pre> 
 BROKER_HOST = "maus.epcc.ed.ac.uk" 
 </pre> 

 h2. Start up a Celery worker 

 To start up a Celery worker, run: 
 <pre> 
 $ celeryd -l INFO -n WORKER_ID 
 </pre> 
 where @WORKER_ID@ is a unique ID for the worker. You should provide one of these as it helps when monitoring if you are using many workers. 

 h3. Specify the number of sub-processes 

 The worker will set up a number of sub-processes, depending on the number of CPUs available to your node. You can explicitly set the number of sub-processes via the @-c@ flag e.g. 
 <pre> 
 $ celeryd -l INFO -n WORKER_ID -c 2 
 </pre> 

 You can specify as many sub-processes as you like but exceeding the number of CPUs available may cause performance to suffer. 

 h3. Purge queued tasks 

 If any tasks are held by RabbitMQ awaiting despatch to workers then the worker, when started, will immediately start to process these. If you want to purge this queue prior to your worker starting, which can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them, then use the @--purge@ flag e.g. 
 <pre> 
 $ celeryd -l INFO -n WORKER_ID --purge 
 </pre> 

 See also "Purge queued tasks" below. 

 h3. Specify the logging level 

 The @-l@ flag specifies the logging level of the worker. The other options, asides from @INFO@, are @DEBUG@, @WARNING@, @ERROR@, @CRITICAL@ e.g. 
 <pre> 
 $ celeryd -l DEBUG -n WORKER_ID  
 </pre> 

 The default logging format is: 
 <pre> 
 [%(asctime)s: %(levelname)s/%(processName)s] %(message)s 
 </pre> 
 You can specify another format in @src/common_py/mauscelery/celeryconfig.py@ using a @CELERYD_LOG_FORMAT@ variable. See the Python @logging@ module, http://ask.github.com/celery/configuration.html#logging, for more on logging. 

 h2. h3. Beware resizing the Celery worker xterm 

 Resizing the Celery worker xterm causes a Celery worker's sub-processes to die and new ones to be created. This may cause complications for any currently-running jobs. It is unclear why this arises (it is not a MAUS-specific bug). 

 h2. Initialise the worker to execute MAUS tasks 

 By default, Celery workers use the default MAUS configuration and apply the @MapPyDoNothing@ transform. They need to be explicity configured to apply other transformations. 

 MAUS tasks support dynamic reconfiguration of Celery workers. When executing a client that uses @Go.py@, the MAUS configuration and a specification of the client's transforms (or mappers) is broadcast to the Celery workers which automatically reconfigure themselves, create the transforms and birth them. 

 To support this, Celery workers support custom MAUS broadcast comments to allow querying and updating of the worker nodes. These are used within @Go.py@ so you won't need to worry about them. But they may prove useful for debugging or out of curiosity. If so, then please see "MAUS-specific monitoring and management" below. 

 h2. Monitoring and management 

 Celery provides a number of commands to allow inspection of Celery workers. Many of these can be invoked from the command-line, via the @celeryctl@ command, or from within Python. For full information, see http://ask.github.com/celery/userguide/monitoring.html 

 In the following, the names shown e.g. @worker1@, @worker2@ etc. are the worker names specified with the @-n@ flag when starting the workers using @celeryd@. 

 In the invocations of the Celery @inspect@ command in Python, a specific Celery worker can be specified e.g. 
 <pre> 
 i = inspect("worker1") 
 </pre> 
 where the worker name is that given to the @-n@ flag when @celeryd@ is invoked to start the worker. 

 h3. Check live workers 

 Check for live Celery workers that have registered with RabbitMQ and are available for use: 
 <pre> 
 $ celeryctl status 
 worker1: OK 
 worker2: OK 

 2 nodes online. 
 </pre> 

 <pre> 
 $ celeryctl inspect ping 
 <- ping 
 -> worker1: OK 
     pong 
 -> worker2: OK 
     pong 
 </pre> 

 To specify a specific worker, use @-d@ e.g.: 
 <pre> 
 $ celeryctl inspect -d worker1 ping 
 <- ping 
 -> worker1: OK 
 </pre> 

 From within Python, use: 
 <pre> 
 $ from celery.task.control import inspect 
 $ i = inspect() 
 $ i.ping() 
 {u'worker1': u'pong', u'worker2': u'pong'} 
 </pre> 

 h3. Check worker configuration 

 <pre> 
 $ celeryctl inspect stats 
 <- stats 
 -> worker1: OK 
     {u'autoscaler': {}, 
      u'consumer': {u'broker': {u'connect_timeout': 4, 
                                u'hostname': u'127.0.0.1', 
                                u'insist': False, 
                                u'login_method': u'AMQPLAIN', 
                                u'port': 5672, 
                                u'ssl': False, 
                                u'transport': u'amqp', 
                                u'transport_options': {}, 
                                u'userid': u'maus', 
                                u'virtual_host': u'maushost'}, 
                    u'prefetch_count': 8}, 
      u'pool': {u'max-concurrency': 2, 
                u'max-tasks-per-child': None, 
                u'processes': [11991, 11992], 
                u'put-guarded-by-semaphore': True, 
                u'timeouts': [None, None]}, 
      u'total': {}} 
 -> worker2: OK 
     {u'autoscaler': {}, 
      u'consumer': {u'broker': {u'connect_timeout': 4, 
                                u'hostname': u'maus.epcc.ed.ac.uk', 
                                u'insist': False, 
                                u'login_method': u'AMQPLAIN', 
                                u'port': 5672, 
                                u'ssl': False, 
                                u'transport': u'amqp', 
                                u'transport_options': {}, 
                                u'userid': u'maus', 
                                u'virtual_host': u'maushost'}, 
                    u'prefetch_count': 8}, 
      u'pool': {u'max-concurrency': 2, 
                u'max-tasks-per-child': None, 
                u'processes': [21964, 21965], 
                u'put-guarded-by-semaphore': True, 
                u'timeouts': [None, None]}, 
      u'total': {}} 
 </pre> 

 From within Python, use: 
 <pre> 
 $ from celery.task.control import inspect 
 $ i.stats() 
 {...} 
 </pre> 

 h3. Check registered tasks 

 Check the tasks that each worker can execute. 

 <pre> 
 $ celeryctl inspect registered 
 <- registered 
 -> worker1: OK 
     * celery.backend_cleanup 
     * celery.chord 
     * celery.chord_unlock 
     * celery.ping 
     * mauscelery.maustasks.MausGenericTransformTask 
 -> worker2: OK 
     * celery.backend_cleanup 
     * celery.chord 
     * celery.chord_unlock 
     * celery.ping 
     * mauscelery.maustasks.MausGenericTransformTask 
 </pre> 

 From within Python, use: 
 <pre> 
 $ from celery.task.control import inspect 
 $ i = inspect() 
 $ i.registered() 
 {u'worker1': [u'celery.backend_cleanup', 
   u'celery.chord', 
   u'celery.chord_unlock', 
   u'celery.ping', 
   u'mauscelery.maustasks.MausGenericTransformTask'], 
  u'worker2': [u'celery.backend_cleanup', 
   u'celery.chord', 
   u'celery.chord_unlock', 
   u'celery.ping', 
   u'mauscelery.maustasks.MausGenericTransformTask']} 
 </pre> 

 h3. Check task states 

 Check the tasks submitted for execution to a Celery worker by a client. 

 Check the tasks currently being executed by the worker: 
 <pre> 
 $ celeryctl inspect active 
 <- active 
 -> worker1: OK 
     * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 13)',  
 u'time_start':1327503329.679438, u'name':u'mauscelery.maustasks.MausGenericTransformTask',  
 u'delivery_info':{u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange':u'celery'},  
 u'hostname': u'worker1', u'acknowledged': True, u'kwargs':u'{}',  
 u'id': u'7222138d-bb2d-4e1b-ba70-5c0f9e90aa08', u'worker_pid':13059} 
     * {...} 
 ... 
 </pre> 
 Note the @worker_pid@ which specifies the process ID of the Celery sub-process executing the task. 

 Check the tasks received by the worker but awaiting execution: 
 <pre> 
 $ celeryctl inspect reserved 
 <- reserved 
 -> worker1: OK 
     * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 95)',  
 u'time_start': None, u'name': u'mauscelery.maustasks.MausGenericTransformTask', 
 u'delivery_info': {u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange': u'celery'},  
 u'hostname': u'worker1', u'acknowledged': False, u'kwargs': u'{}',  
 u'id': u'ee1b3a88-58cc-4e26-b77d-4424ec9161d1', u'worker_pid': None} 
     * {...} 
 ... 
 </pre> 
 Note the @worker_pid@ which specifies the process ID of the Celery sub-process executing the task is currently @None@. 

 From within Python. use: 
 <pre> 
 $ from celery.task.control import inspect 
 $ i = inspect() 
 $ i.active() 
 {u'worker1': [...], ...} 

 $ i.reserved() 
 {u'worker1': [...], ...} 
 </pre> 

 h3. Get results 

 When a task is submitted to a Celery worker, a unique task ID is generated. Using this ID you can collect the results of a task, providing you have not collected them already, using @celeryctl@ e.g. 

 <pre> 
 $ celeryctl result 95b3c56e-bea9-433d-aecd-920601c5ad20 
 {"digits": [{"tdc_counts": 1, "channel_id": {"fiber_number": 106, "tracker_numbe 
 r": 0, "type": "Tracker", "station_number": 1, "plane_number": 2}, "adc_counts": 
 ... 
 </pre> 

 *Warning* Results can only be collected once. If you do this when running an analysis it may cause problems with your client (which may wait to receive the results that you've already collected. This command should only be used for debugging or trying to establish why things are going wrong. For example, if the Celery worker needs to be terminated and you want to get results from RabbitMQ. 

 h3. Purge queued tasks 

 To purge tasks currently awaiting dispatch from RabbitMQ. This can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them. 

 <pre> 
 $ celeryctl purge 
 Purged 4 messages from 1 known task queue. 
 </pre> 

 From within Python, use: 
 <pre> 
 $ from celery.task.control import discard_all 
 $ discard_all() 
 4 
 </pre> 

 h3. Shut down workers 

 All workers can be shut down from within Python via: 

 <pre> 
 $ from celery.task.control import broadcast 
 $ broadcast("shutdown") 
 </pre> 

 Each worker will complete the tasks they are currently processing before shutting down. 

 Alternatively, you can use the Linux @kill@ command e.g.: 
 <pre> 
 $ ps -a 
 12614 pts/6      00:00:02 celeryd 
 12627 pts/6      00:00:00 celeryd 
 12628 pts/6      00:00:00 celeryd 
 $ kill -s TERM 12614 
 </pre> 
 The process ID should be that of the main worker process. This will usually have the lowest process ID. 

 To kill the worker *immediately*, without waiting for currently processing tasks to complete, use: 
 <pre> 
 $ kill -s KILL 12614 
 </pre> 

 To kill all @celeryd@ processes, use: 
 <pre> 
 $ ps auxww | grep celeryd | awk '{print $2}' | xargs kill -9 
 </pre> 

 h2. MAUS-specific monitoring and management 

 These actions are supported by Celery worker's running on top of MAUS. 

 h3. Get worker MAUS configuration 

 Get the current MAUS configuration known to the workers, and the transforms that the workers will execute:  

 <pre> 
 $ from celery.task.control import broadcast 
 $ broadcast("get_maus_configuration", reply=True) 
 [{u'worker1': {u'config_id': 123, u'configuration': u'{}', u'transform': u'MapPyDoNothing'}}] 
 </pre> 

 where: 

 * @config_id@ - a unique ID for the current worker configuration. 
 * @configuration@ - the worker's MAUS configuration as forwarded by a client. 
 * @transform@ - the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups. 

 h3. Birth the worker transforms with a MAUS configuration 

 Update a worker with a new MAUS configuration and transforms. Existing transforms have death invoked. The new transforms are created and then birth is invoked upon them with the new configuration. 
 <pre> 
 $ from celery.task.control import broadcast 
 $ broadcast("birth", arguments={"configuration":"""{"TOFconversionFactor":0.01}""", \ 
     "transform":"MapPyDoNothing",  
     \"config_id":123}, reply=True) 
 [{u'maus.epcc.ed.ac.uk': {u'status': u'ok'}}] 
 </pre> 

 where: 

 * @config_id@ - a unique ID for the worker configuration. For the update to be applied this must be different to that currently held by the workers and returned by @get_maus_configuration@ above. 
 * @configuration@ - the MAUS configuration. 
 * @transform@ - the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups. 

 If using MapCppSimulation this can take a while to birth and there is a risk that workers will return their status before they're ready. So, to avoid this, run: 

 <pre> 
 # Get the current number of workers 
 $ from celery.task.control import inspect 
 $ i = inspect() 
 $ num_workers = len(i.active()) 
 $ print num_workers 
 $ results = broadcast("birth", arguments={"transform":workers, "configuration":json_config_doc, "config_id": 1234}, \ 
   reply=True, timeout=300, limit=num_workers) 
 $ print results 
 </pre> 

 where 

 * @timeout@ is the maximum time in seconds to wait for any worker to respond. 
 * @limit@ is the number of workers to wait for replies before exiting. 

 This ensures that broadcast will wait up to 300s for responses from all the workers, but if all the workers reply promptly it exists there and then without waiting the full 300s. 

 h3. Death the worker transforms 

 Invoke death on the workers' transforms: 
 <pre> 
 $ from celery.task.control import broadcast 
 $ broadcast("death", reply=True) 
 [{u'maus.epcc.ed.ac.uk': {u'status': u'ok'}}] 
 </pre> 

 If @death@ is called twice then this action does nothing.