Project

General

Profile

MAUSCelery » History » Version 18

Jackson, Mike, 02 February 2012 17:00

1 1 Jackson, Mike
h1. Celery configuration and monitoring
2
3 12 Jackson, Mike
{{>toc}}
4
5 1 Jackson, Mike
MAUS can be used with the Celery asynchronous distributed task queue to allow transform steps to be executed on multiple processors.
6
7
For full information on using Celery, see http://celeryproject.org/ and:
8
9
* "Introduction":http://docs.celeryproject.org/en/latest/getting-started/introduction.html
10
* "Help resources":http://ask.github.com/celery/getting-started/resources.html
11
* "User doc":http://ask.github.com/celery/index.html
12 11 Jackson, Mike
* "Source code":https://github.com/ask/celery/tree/master/celery
13
14 1 Jackson, Mike
h2. Installation
15
16
Celery is automatically downloaded and installed when you build MAUS.
17
18
h2. Configure a host to run a Celery worker
19
20
To configure a host to run a Celery worker:
21
22
* Ensure MAUS is available on the host you want to use as a Celery worker.
23
* Ensure you have run:
24
<pre>
25
$ source env.sh
26
</pre>
27
* If you have deployed RabbitMQ on a remote host,
28
** Edit @src/common_py/mauscelery/celeryconfig.py@
29
** Change
30
<pre>
31
BROKER_HOST = "localhost"
32
</pre>
33
to specify the full hostname of the host on which RabbitMQ was deployed e.g.
34
<pre>
35
BROKER_HOST = "maus.epcc.ed.ac.uk"
36
</pre>
37
38
h2. Start up a Celery worker
39
40
To start up a Celery worker, run:
41
<pre>
42
$ celeryd -l INFO -n WORKER_ID
43
</pre>
44
where @WORKER_ID@ is a unique ID for the worker. You should provide one of these as it helps when monitoring if you are using many workers.
45
46
h3. Specify the number of sub-processes
47
48
The worker will set up a number of sub-processes, depending on the number of CPUs available to your host. You can explicitly set the number of sub-processes via the @-c@ flag e.g.
49
<pre>
50
$ celeryd -l INFO -n WORKER_ID -c 2
51
</pre>
52
53 2 Jackson, Mike
You can specify as many sub-processes as you like but exceeding the number of CPUs available may cause performance to suffer.
54
55 1 Jackson, Mike
h3. Purge queued tasks
56
57
If any tasks are held by RabbitMQ awaiting despatch to workers then the worker, when started, will immediately start to process these. If you want to purge this queue prior to your worker starting, which can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them, then use the @--purge@ flag e.g.
58
<pre>
59
$ celeryd -l INFO -n WORKER_ID --purge
60
</pre>
61
62
See also "Purge queued tasks" below.
63
64
h3. Specify the logging level
65
66
The @-l@ flag specifies the logging level of the worker. The other options, asides from @INFO@, are @DEBUG@, @WARNING@, @ERROR@, @CRITICAL@ e.g.
67
<pre>
68
$ celeryd -l DEBUG -n WORKER_ID 
69
</pre>
70
71 7 Jackson, Mike
The default logging format is:
72
<pre>
73
[%(asctime)s: %(levelname)s/%(processName)s] %(message)s
74
</pre>
75
You can specify another format in @src/common_py/mauscelery/celeryconfig.py@ using a @CELERYD_LOG_FORMAT@ variable. See the Python @logging@ module, http://ask.github.com/celery/configuration.html#logging, for more on logging.
76
77 4 Jackson, Mike
h2. Initialise the worker to execute MAUS tasks
78
79
By default, Celery workers use the default MAUS configuration and apply the MapPyDoNothing transform. They need to be explicity configured to apply other transformations.
80
81
If you are running analyses using a client that uses Go.py then this configuration will be done by Go.py on your behalf. 
82
83 18 Jackson, Mike
If you are not using Go.py then you can do this manually (or in your own Python code) using the MAUS-specific commands described below in "Birth the worker transforms".
84 4 Jackson, Mike
85 14 Jackson, Mike
h2. Monitoring and management
86 1 Jackson, Mike
87
Celery provides a number of commands to allow inspection of Celery workers. Many of these can be invoked from the command-line, via the @celeryctl@ command, or from within Python. For full information, see http://ask.github.com/celery/userguide/monitoring.html
88
89
In the following, the names shown e.g. @worker1@, @worker2@ etc. are the worker names specified with the @-n@ flag when starting the workers using @celeryd@.
90
91 9 Jackson, Mike
In the invocations of the Celery @inspect@ command in Python, a specific Celery worker can be specified e.g.
92
<pre>
93
i = inspect("worker1")
94
</pre>
95
where the worker name is that given to the @-n@ flag when @celeryd@ is invoked to start the worker.
96
97 1 Jackson, Mike
h3. Check live workers
98
99
Check for live Celery workers that have registered with RabbitMQ and are available for use:
100
<pre>
101
$ celeryctl status
102
worker1: OK
103
worker2: OK
104
105
2 nodes online.
106
</pre>
107
108
<pre>
109
$ celeryctl inspect ping
110
<- ping
111
-> worker1: OK
112
    pong
113
-> worker2: OK
114
    pong
115
</pre>
116
117
To specify a specific worker, use @-d@ e.g.:
118
<pre>
119
$ celeryctl inspect -d worker1 ping
120
<- ping
121
-> worker1: OK
122
</pre>
123
124
From within Python, use:
125
<pre>
126
$ from celery.task.control import inspect
127
$ i = inspect()
128
$ i.ping()
129
{u'worker1': u'pong', u'worker2': u'pong'}
130
</pre>
131
132
h3. Check worker configuration
133
134
<pre>
135
$ celeryctl inspect stats
136
<- stats
137
-> worker1: OK
138
    {u'autoscaler': {},
139
     u'consumer': {u'broker': {u'connect_timeout': 4,
140
                               u'hostname': u'127.0.0.1',
141
                               u'insist': False,
142
                               u'login_method': u'AMQPLAIN',
143
                               u'port': 5672,
144
                               u'ssl': False,
145
                               u'transport': u'amqp',
146
                               u'transport_options': {},
147
                               u'userid': u'maus',
148
                               u'virtual_host': u'maushost'},
149
                   u'prefetch_count': 8},
150
     u'pool': {u'max-concurrency': 2,
151
               u'max-tasks-per-child': None,
152
               u'processes': [11991, 11992],
153
               u'put-guarded-by-semaphore': True,
154
               u'timeouts': [None, None]},
155
     u'total': {}}
156
-> worker2: OK
157
    {u'autoscaler': {},
158
     u'consumer': {u'broker': {u'connect_timeout': 4,
159
                               u'hostname': u'maus.epcc.ed.ac.uk',
160
                               u'insist': False,
161
                               u'login_method': u'AMQPLAIN',
162
                               u'port': 5672,
163
                               u'ssl': False,
164
                               u'transport': u'amqp',
165
                               u'transport_options': {},
166
                               u'userid': u'maus',
167
                               u'virtual_host': u'maushost'},
168
                   u'prefetch_count': 8},
169
     u'pool': {u'max-concurrency': 2,
170
               u'max-tasks-per-child': None,
171
               u'processes': [21964, 21965],
172
               u'put-guarded-by-semaphore': True,
173
               u'timeouts': [None, None]},
174
     u'total': {}}
175
</pre>
176
177
From within Python, use:
178
<pre>
179
$ from celery.task.control import inspect
180
$ i.stats()
181
{...}
182
</pre>
183
184
h3. Check registered tasks
185
186
Check the tasks that each worker can execute.
187
188
<pre>
189
$ celeryctl inspect registered
190
<- registered
191
-> worker1: OK
192
    * celery.backend_cleanup
193
    * celery.chord
194
    * celery.chord_unlock
195
    * celery.ping
196
    * mauscelery.maustasks.MausGenericTransformTask
197
-> worker2: OK
198
    * celery.backend_cleanup
199
    * celery.chord
200
    * celery.chord_unlock
201
    * celery.ping
202
    * mauscelery.maustasks.MausGenericTransformTask
203
</pre>
204
205
From within Python, use:
206
<pre>
207
$ from celery.task.control import inspect
208
$ i = inspect()
209
$ i.registered()
210
{u'worker1': [u'celery.backend_cleanup',
211
  u'celery.chord',
212
  u'celery.chord_unlock',
213
  u'celery.ping',
214
  u'mauscelery.maustasks.MausGenericTransformTask'],
215
 u'worker2': [u'celery.backend_cleanup',
216
  u'celery.chord',
217
  u'celery.chord_unlock',
218
  u'celery.ping',
219
  u'mauscelery.maustasks.MausGenericTransformTask']}
220
</pre>
221
222
h3. Check task states
223
224 8 Jackson, Mike
Check the tasks submitted for execution to a Celery worker by a client.
225 1 Jackson, Mike
226 8 Jackson, Mike
Check the tasks currently being executed by the worker:
227 1 Jackson, Mike
<pre>
228
$ celeryctl inspect active
229
<- active
230
-> worker1: OK
231 8 Jackson, Mike
    * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 13)', 
232
u'time_start':1327503329.679438, u'name':u'mauscelery.maustasks.MausGenericTransformTask', 
233
u'delivery_info':{u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange':u'celery'}, 
234
u'hostname': u'worker1', u'acknowledged': True, u'kwargs':u'{}', 
235
u'id': u'7222138d-bb2d-4e1b-ba70-5c0f9e90aa08', u'worker_pid':13059}
236
    * {...}
237
...
238 1 Jackson, Mike
</pre>
239 8 Jackson, Mike
Note the @worker_pid@ which specifies the process ID of the Celery sub-process executing the task.
240 1 Jackson, Mike
241 8 Jackson, Mike
Check the tasks received by the worker but awaiting execution:
242 1 Jackson, Mike
<pre>
243
$ celeryctl inspect reserved
244
<- reserved
245
-> worker1: OK
246 8 Jackson, Mike
    * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 95)', 
247
u'time_start': None, u'name': u'mauscelery.maustasks.MausGenericTransformTask',
248
u'delivery_info': {u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange': u'celery'}, 
249
u'hostname': u'worker1', u'acknowledged': False, u'kwargs': u'{}', 
250
u'id': u'ee1b3a88-58cc-4e26-b77d-4424ec9161d1', u'worker_pid': None}
251
    * {...}
252
...
253 1 Jackson, Mike
</pre>
254 8 Jackson, Mike
Note the @worker_pid@ which specifies the process ID of the Celery sub-process executing the task is currently @None@.
255 1 Jackson, Mike
256
From within Python. use:
257
<pre>
258
$ from celery.task.control import inspect
259
$ i = inspect()
260
$ i.active()
261 8 Jackson, Mike
{u'worker1': [...], ...}
262 1 Jackson, Mike
263
$ i.reserved()
264 8 Jackson, Mike
{u'worker1': [...], ...}
265 1 Jackson, Mike
</pre>
266
267 16 Jackson, Mike
h3. Get results
268 15 Jackson, Mike
269 16 Jackson, Mike
When a task is submitted to a Celery worker, a unique task ID is generated. Using this ID you can collect the results of a task, providing you have not collected them already, using @celeryctl@ e.g.
270 15 Jackson, Mike
271
<pre>
272
$ celeryctl result 95b3c56e-bea9-433d-aecd-920601c5ad20
273
{"digits": [{"tdc_counts": 1, "channel_id": {"fiber_number": 106, "tracker_numbe
274
r": 0, "type": "Tracker", "station_number": 1, "plane_number": 2}, "adc_counts":
275
...
276
</pre>
277
278
*Warning* Results can only be collected once. If you do this when running an analysis it may cause problems with your client (which may wait to receive the results that you've already collected. This command should only be used for debugging or trying to establish why things are going wrong. For example, if the Celery worker needs to be terminated and you want to get results from RabbitMQ.
279
280 1 Jackson, Mike
h3. Purge queued tasks
281
282
To purge tasks currently awaiting dispatch from RabbitMQ. This can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them.
283
284
<pre>
285
$ celeryctl purge
286
Purged 4 messages from 1 known task queue.
287
</pre>
288
289
From within Python, use:
290
<pre>
291
$ from celery.task.control import discard_all
292
$ discard_all()
293
4
294
</pre>
295
296
h3. Shut down workers
297
298
All workers can be shut down from within Python via:
299
300
<pre>
301
$ from celery.task.control import broadcast
302
$ broadcast("shutdown")
303
</pre>
304
305 3 Jackson, Mike
Each worker will complete the tasks they are currently processing before shutting down.
306 1 Jackson, Mike
307 3 Jackson, Mike
Alternatively, you can use the Linux @kill@ command e.g.:
308
<pre>
309
$ ps -a
310
12614 pts/6    00:00:02 celeryd
311
12627 pts/6    00:00:00 celeryd
312 1 Jackson, Mike
12628 pts/6    00:00:00 celeryd
313 3 Jackson, Mike
$ kill -s TERM 12614
314 2 Jackson, Mike
</pre>
315 3 Jackson, Mike
The process ID should be that of the main worker process. This will usually have the lowest process ID.
316 2 Jackson, Mike
317 16 Jackson, Mike
To kill the worker *immediately*, without waiting for currently processing tasks to complete, use:
318 2 Jackson, Mike
<pre>
319
$ kill -s KILL 12614
320 3 Jackson, Mike
</pre>
321 2 Jackson, Mike
322
To kill all @celeryd@ processes, use:
323
<pre>
324
$ ps auxww | grep celeryd | awk '{print $2}' | xargs kill -9
325 1 Jackson, Mike
</pre>
326
327 13 Jackson, Mike
h2. MAUS-specific monitoring and management
328 1 Jackson, Mike
329
These actions are supported by Celery worker's running on top of MAUS.
330
331 17 Jackson, Mike
h3. Get worker MAUS configuration
332 1 Jackson, Mike
333 17 Jackson, Mike
Get the current MAUS configuration known to the workers, and the transforms that the workers will execute: 
334 1 Jackson, Mike
335
<pre>
336
$ from celery.task.control import broadcast
337 17 Jackson, Mike
$ broadcast("get_maus_configuration", reply=True)
338
[{u'worker1': {u'config_id': 123, u'configuration': u'{}', u'transform': u'MapPyDoNothing'}}]
339 1 Jackson, Mike
</pre>
340 3 Jackson, Mike
341 17 Jackson, Mike
where:
342 1 Jackson, Mike
343 17 Jackson, Mike
* @config_id@ - a unique ID for the current worker configuration.
344
* @configuration@ - the worker's MAUS configuration as forwarded by a client.
345
* @transform@ - the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups.
346 2 Jackson, Mike
347 18 Jackson, Mike
h3. Birth the worker transforms with a MAUS configuration
348 1 Jackson, Mike
349 17 Jackson, Mike
Update a worker with a new MAUS configuration and transforms. Existing transforms have death invoked. The new transforms are created and then birth is invoked upon them with the new configuration.
350 1 Jackson, Mike
<pre>
351
$ from celery.task.control import broadcast
352 17 Jackson, Mike
$ broadcast("birth", arguments={"configuration":"""{"TOFconversionFactor":0.01}""", \
353
    "transform":"MapPyDoNothing", 
354
    \"config_id":123}, reply=True)
355
[{u'maus.epcc.ed.ac.uk': {u'7692': {u'status': u'ok'}, u'7693': {u'status': u'ok'}}}]
356 1 Jackson, Mike
</pre>
357
358 17 Jackson, Mike
where:
359 1 Jackson, Mike
360 17 Jackson, Mike
* @config_id@ - a unique ID for the worker configuration.
361
* @configuration@ - the MAUS configuration.
362
* @transform@ - the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups.
363 1 Jackson, Mike
364 17 Jackson, Mike
The status returned shows the worker IDs and the sub-process(es) within the worker, along with whether the operation was successfully invoked within the sub-processes.
365 1 Jackson, Mike
366 17 Jackson, Mike
IF @birth@ is called twice with the same @config_id@ then this action does nothing and the status returned is:
367
368 1 Jackson, Mike
<pre>
369 17 Jackson, Mike
[{u'maus.epcc.ed.ac.uk': {}}]
370 1 Jackson, Mike
</pre>
371
372 18 Jackson, Mike
h3. Death the worker transforms
373 17 Jackson, Mike
374
Invoke death on the workers' transforms:
375 1 Jackson, Mike
<pre>
376
$ from celery.task.control import broadcast
377 17 Jackson, Mike
$ broadcast("death", reply=True)
378
[{u'maus.epcc.ed.ac.uk': {u'7692': {u'status': u'ok'}, u'7693': {u'status': u'ok'}}}]
379 1 Jackson, Mike
</pre>
380
381 17 Jackson, Mike
The status returned shows the worker IDs and the sub-process(es) within the worker, along with whether the operation was successfully invoked within the sub-processes.
382
383
IF @death@ is called twice then this action does nothing and the status returned is:
384
385 1 Jackson, Mike
<pre>
386 17 Jackson, Mike
[{u'maus.epcc.ed.ac.uk': {}}]
387
</pre>
388
389
h3. Get information on worker processes
390
391
Get a list of the worker node processes and their child processes (child processes are responsible for executing jobs):
392
393
<pre>
394 1 Jackson, Mike
$ from celery.task.control import broadcast
395 17 Jackson, Mike
$ broadcast("get_process_pool", reply=True)
396
[{u'worker1': {u'master_name': u'MainProcess',
397
   u'master_pid': 12614,
398
   u'pool_pids': [12627, 12628]}}]
399 1 Jackson, Mike
</pre>
400
401 17 Jackson, Mike
The sub-process IDs correspond to those visible in the @processes@ field of the document returned by the @celeryctl inspect stats@ command, described in "Check worker configuration" above and to those visible if running the Linux @ps -a@ command e.g.
402
<pre>
403
$ ps -a
404
12614 pts/6    00:00:02 celeryd
405
12627 pts/6    00:00:00 celeryd
406
12628 pts/6    00:00:00 celeryd
407
</pre>
408
409 1 Jackson, Mike
h3. Restart the workers
410
411
To restart Celery workers, run:
412 5 Jackson, Mike
<pre>
413 6 Jackson, Mike
$ from celery.task.control import broadcast
414
$ broadcast("restart_pool", reply=True)
415
[{u'worker1': {u'status': u'ok'}}]
416
</pre>
417 17 Jackson, Mike
This instructs the Celery worker to terminate its sub-processes and spawn new ones. These new ones will inherit any updated configuration or transforms sent using @birth@ above.
418 6 Jackson, Mike
419 7 Jackson, Mike
In the @celeryd@ window you might see this:
420
<pre>
421
Consumer: Connection to broker lost. Trying to re-establish the connection...
422 6 Jackson, Mike
Traceback (most recent call last):
423
...
424 1 Jackson, Mike
...
425
...
426 10 Jackson, Mike
error: [Errno 4] Interrupted system call
427
</pre>
428 1 Jackson, Mike
This can be ignored. If in doubt you can use @get_maus_configuration@ above to ensure that the workers have the new configuration.
429
430
Note: Celery 2.5 supports a @broadcast("pool_restart")@ command which has the same intent as the above. At present MAUS uses Celery 2.4.6 which is the latest one available via @easy_install@.