Project

General

Profile

MAUSCelery » History » Version 7

Jackson, Mike, 25 January 2012 14:40

1 1 Jackson, Mike
h1. Celery configuration and monitoring
2
3
MAUS can be used with the Celery asynchronous distributed task queue to allow transform steps to be executed on multiple processors.
4
5
For full information on using Celery, see http://celeryproject.org/ and:
6
7
* "Introduction":http://docs.celeryproject.org/en/latest/getting-started/introduction.html
8
* "Help resources":http://ask.github.com/celery/getting-started/resources.html
9
* "User doc":http://ask.github.com/celery/index.html
10
* "Source code":https://github.com/ask/celery/tree/master/celery
11
12
h2. Installation
13
14
Celery is automatically downloaded and installed when you build MAUS.
15
16
h2. Configure a host to run a Celery worker
17
18
To configure a host to run a Celery worker:
19
20
* Ensure MAUS is available on the host you want to use as a Celery worker.
21
* Ensure you have run:
22
<pre>
23
$ source env.sh
24
</pre>
25
* If you have deployed RabbitMQ on a remote host,
26
** Edit @src/common_py/mauscelery/celeryconfig.py@
27
** Change
28
<pre>
29
BROKER_HOST = "localhost"
30
</pre>
31
to specify the full hostname of the host on which RabbitMQ was deployed e.g.
32
<pre>
33
BROKER_HOST = "maus.epcc.ed.ac.uk"
34
</pre>
35
36
h2. Start up a Celery worker
37
38
To start up a Celery worker, run:
39
<pre>
40
$ celeryd -l INFO -n WORKER_ID
41
</pre>
42
where @WORKER_ID@ is a unique ID for the worker. You should provide one of these as it helps when monitoring if you are using many workers.
43
44
h3. Specify the number of sub-processes
45
46
The worker will set up a number of sub-processes, depending on the number of CPUs available to your host. You can explicitly set the number of sub-processes via the @-c@ flag e.g.
47
<pre>
48
$ celeryd -l INFO -n WORKER_ID -c 2
49
</pre>
50
51 2 Jackson, Mike
You can specify as many sub-processes as you like but exceeding the number of CPUs available may cause performance to suffer.
52
53 1 Jackson, Mike
h3. Purge queued tasks
54
55
If any tasks are held by RabbitMQ awaiting despatch to workers then the worker, when started, will immediately start to process these. If you want to purge this queue prior to your worker starting, which can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them, then use the @--purge@ flag e.g.
56
<pre>
57
$ celeryd -l INFO -n WORKER_ID --purge
58
</pre>
59
60
See also "Purge queued tasks" below.
61
62
h3. Specify the logging level
63
64
The @-l@ flag specifies the logging level of the worker. The other options, asides from @INFO@, are @DEBUG@, @WARNING@, @ERROR@, @CRITICAL@ e.g.
65
<pre>
66
$ celeryd -l DEBUG -n WORKER_ID 
67
</pre>
68
69 7 Jackson, Mike
The default logging format is:
70
<pre>
71
[%(asctime)s: %(levelname)s/%(processName)s] %(message)s
72
</pre>
73
You can specify another format in @src/common_py/mauscelery/celeryconfig.py@ using a @CELERYD_LOG_FORMAT@ variable. See the Python @logging@ module, http://ask.github.com/celery/configuration.html#logging, for more on logging.
74
75 4 Jackson, Mike
h2. Initialise the worker to execute MAUS tasks
76
77
By default, Celery workers use the default MAUS configuration and apply the MapPyDoNothing transform. They need to be explicity configured to apply other transformations.
78
79
If you are running analyses using a client that uses Go.py then this configuration will be done by Go.py on your behalf. 
80
81
If you are not using Go.py then you can do this manually (or in your own Python code) using the MAUS-specific commands described below in "Update workers with a new MAUS configuration and/or transforms" and "Restart the workers"
82
83 1 Jackson, Mike
h2. Celery monitoring
84
85
Celery provides a number of commands to allow inspection of Celery workers. Many of these can be invoked from the command-line, via the @celeryctl@ command, or from within Python. For full information, see http://ask.github.com/celery/userguide/monitoring.html
86
87
In the following, the names shown e.g. @worker1@, @worker2@ etc. are the worker names specified with the @-n@ flag when starting the workers using @celeryd@.
88
89
h3. Check live workers
90
91
Check for live Celery workers that have registered with RabbitMQ and are available for use:
92
<pre>
93
$ celeryctl status
94
worker1: OK
95
worker2: OK
96
97
2 nodes online.
98
</pre>
99
100
<pre>
101
$ celeryctl inspect ping
102
<- ping
103
-> worker1: OK
104
    pong
105
-> worker2: OK
106
    pong
107
</pre>
108
109
To specify a specific worker, use @-d@ e.g.:
110
<pre>
111
$ celeryctl inspect -d worker1 ping
112
<- ping
113
-> worker1: OK
114
</pre>
115
116
From within Python, use:
117
<pre>
118
$ from celery.task.control import inspect
119
$ i = inspect()
120
$ i.ping()
121
{u'worker1': u'pong', u'worker2': u'pong'}
122
</pre>
123
124
h3. Check worker configuration
125
126
<pre>
127
$ celeryctl inspect stats
128
<- stats
129
-> worker1: OK
130
    {u'autoscaler': {},
131
     u'consumer': {u'broker': {u'connect_timeout': 4,
132
                               u'hostname': u'127.0.0.1',
133
                               u'insist': False,
134
                               u'login_method': u'AMQPLAIN',
135
                               u'port': 5672,
136
                               u'ssl': False,
137
                               u'transport': u'amqp',
138
                               u'transport_options': {},
139
                               u'userid': u'maus',
140
                               u'virtual_host': u'maushost'},
141
                   u'prefetch_count': 8},
142
     u'pool': {u'max-concurrency': 2,
143
               u'max-tasks-per-child': None,
144
               u'processes': [11991, 11992],
145
               u'put-guarded-by-semaphore': True,
146
               u'timeouts': [None, None]},
147
     u'total': {}}
148
-> worker2: OK
149
    {u'autoscaler': {},
150
     u'consumer': {u'broker': {u'connect_timeout': 4,
151
                               u'hostname': u'maus.epcc.ed.ac.uk',
152
                               u'insist': False,
153
                               u'login_method': u'AMQPLAIN',
154
                               u'port': 5672,
155
                               u'ssl': False,
156
                               u'transport': u'amqp',
157
                               u'transport_options': {},
158
                               u'userid': u'maus',
159
                               u'virtual_host': u'maushost'},
160
                   u'prefetch_count': 8},
161
     u'pool': {u'max-concurrency': 2,
162
               u'max-tasks-per-child': None,
163
               u'processes': [21964, 21965],
164
               u'put-guarded-by-semaphore': True,
165
               u'timeouts': [None, None]},
166
     u'total': {}}
167
</pre>
168
169
From within Python, use:
170
<pre>
171
$ from celery.task.control import inspect
172
$ i.stats()
173
{...}
174
</pre>
175
176
h3. Check registered tasks
177
178
Check the tasks that each worker can execute.
179
180
<pre>
181
$ celeryctl inspect registered
182
<- registered
183
-> worker1: OK
184
    * celery.backend_cleanup
185
    * celery.chord
186
    * celery.chord_unlock
187
    * celery.ping
188
    * mauscelery.maustasks.MausGenericTransformTask
189
-> worker2: OK
190
    * celery.backend_cleanup
191
    * celery.chord
192
    * celery.chord_unlock
193
    * celery.ping
194
    * mauscelery.maustasks.MausGenericTransformTask
195
</pre>
196
197
From within Python, use:
198
<pre>
199
$ from celery.task.control import inspect
200
$ i = inspect()
201
$ i.registered()
202
{u'worker1': [u'celery.backend_cleanup',
203
  u'celery.chord',
204
  u'celery.chord_unlock',
205
  u'celery.ping',
206
  u'mauscelery.maustasks.MausGenericTransformTask'],
207
 u'worker2': [u'celery.backend_cleanup',
208
  u'celery.chord',
209
  u'celery.chord_unlock',
210
  u'celery.ping',
211
  u'mauscelery.maustasks.MausGenericTransformTask']}
212
</pre>
213
214
h3. Check task states
215
216
Check the active, scheduled, reserved and revoked tasks.
217
218
<pre>
219
$ celeryctl inspect active
220
<- active
221
-> worker1: OK
222
    - empty -
223
-> worker2: OK
224
    - empty -
225
</pre>
226
227
<pre>
228
$ celeryctl inspect scheduled
229
<- scheduled
230
-> worker1: OK
231
    - empty -
232
-> worker2: OK
233
    - empty -
234
</pre>
235
236
<pre>
237
$ celeryctl inspect reserved
238
<- reserved
239
-> worker2: OK
240
    - empty -
241
-> worker1: OK
242
    - empty 
243
</pre>
244
245
From within Python. use:
246
<pre>
247
$ from celery.task.control import inspect
248
$ i = inspect()
249
$ i.active()
250
{u'worker1': [], u'worker2': []}
251
252
$ i.scheduled()
253
{u'worker1': [], u'worker2': []}
254
255
$ i.reserved()
256
{u'worker1': [], u'worker2': []}
257
</pre>
258
259
h3. Purge queued tasks
260
261
To purge tasks currently awaiting dispatch from RabbitMQ. This can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them.
262
263
<pre>
264
$ celeryctl purge
265
Purged 4 messages from 1 known task queue.
266
</pre>
267
268
From within Python, use:
269
<pre>
270
$ from celery.task.control import discard_all
271
$ discard_all()
272
4
273
</pre>
274
275
h3. Shut down workers
276
277
All workers can be shut down from within Python via:
278
279
<pre>
280
$ from celery.task.control import broadcast
281
$ broadcast("shutdown")
282
</pre>
283
284
Each worker will complete the tasks they are currently processing before shutting down.
285
286 3 Jackson, Mike
Alternatively, you can use the Linux @kill@ command e.g.:
287 1 Jackson, Mike
<pre>
288 3 Jackson, Mike
$ ps -a
289
12614 pts/6    00:00:02 celeryd
290
12627 pts/6    00:00:00 celeryd
291
12628 pts/6    00:00:00 celeryd
292
$ kill -s TERM 12614
293 1 Jackson, Mike
</pre>
294 3 Jackson, Mike
The process ID should be that of the main worker process. This will usually have the lowest process ID.
295 2 Jackson, Mike
296 3 Jackson, Mike
To kill the worker immediately, without waiting for currently processing tasks to complete, use:
297 2 Jackson, Mike
<pre>
298 3 Jackson, Mike
$ kill -s KILL 12614
299 2 Jackson, Mike
</pre>
300
301 3 Jackson, Mike
To kill all @celeryd@ processes, use:
302 2 Jackson, Mike
<pre>
303
$ ps auxww | grep celeryd | awk '{print $2}' | xargs kill -9
304
</pre>
305
306 1 Jackson, Mike
h2. MAUS-specific actions
307
308
These actions are supported by Celery worker's running on top of MAUS.
309
310
h3. Get information on worker processes
311
312
Get a list of the worker node processes and their child processes (child processes are responsible for executing jobs):
313
314
<pre>
315
$ from celery.task.control import broadcast
316
$ broadcast("get_process_pool", reply=True)
317
[{u'worker1': {u'master_name': u'MainProcess',
318 2 Jackson, Mike
   u'master_pid': 12614,
319
   u'pool_pids': [12627, 12628]}}]
320 1 Jackson, Mike
</pre>
321
322 3 Jackson, Mike
The sub-process IDs correspond to those visible in the @processes@ field of the document returned by the @celeryctl inspect stats@ command, described in "Check worker configuration" above and to those visible if running the Linux @ps -a@ command e.g.
323 2 Jackson, Mike
<pre>
324
$ ps -a
325
12614 pts/6    00:00:02 celeryd
326
12627 pts/6    00:00:00 celeryd
327
12628 pts/6    00:00:00 celeryd
328
</pre>
329
330 1 Jackson, Mike
h3. Get worker MAUS configuration
331
332
Get the current MAUS configuration known to the workers, and the transforms that the workers will execute: 
333
334
<pre>
335
$ from celery.task.control import broadcast
336
$ broadcast("get_maus_configuration", reply=True)
337
[{u'worker1': {u'configuration': u'{}', u'transform': u'MapPyDoNothing'}}]
338
</pre>
339
340
Note that, for "transform", a list represents a MapPyGroup and nested lists represent nested MapPyGroups.
341
342
h3. Update workers with a new MAUS configuration and/or transforms
343
344
The worker must be restarted, see below, before the new configuration and/or transforms is actually available for use in the worker.
345
346
Update a worker with a new MAUS configuration. Existing transforms have death then birth invoked with the new configuration.
347
<pre>
348
$ from celery.task.control import broadcast
349
$ broadcast("set_maus_configuration", arguments= \
350
{"configuration":"""{"TOFconversionFactor":0.01}"""}, reply=True)
351
[{u'worker1': {u'status': u'ok'}}]
352
</pre>
353
354
Update a worker with a new transform. Existing transforms have death invoked. Instances of the new transforms are created and the current MAUS configuration passed into their birth method.
355
<pre>
356
$ from celery.task.control import broadcast
357
$ broadcast("set_maus_configuration", arguments= \
358
{"transform":["MapPyPrint", "MapPyDoNothing"]}, reply=True)
359
[{u'worker1': {u'status': u'ok'}}]
360
</pre>
361
362
Update a worker with a new MAUS configuration and a new transform.
363
<pre>
364
$ from celery.task.control import broadcast
365
$ broadcast("set_maus_configuration", arguments= \
366
{"transform":["MapPyPrint", "MapPyDoNothing"], \
367
 "configuration":"""{"TOFconversionFactor":0.01}"""}, reply=True)
368
[{u'worker1': {u'status': u'ok'}}]
369
</pre>
370
371
h3. Restart the workers
372
373 5 Jackson, Mike
To restart Celery workers, run:
374
<pre>
375 1 Jackson, Mike
$ from celery.task.control import broadcast
376
$ broadcast("restart_pool", reply=True)
377
[{u'worker1': {u'status': u'ok'}}]
378
</pre>
379 5 Jackson, Mike
This instructs the Celery worker to terminate its sub-processes and spawn new ones. These new ones will inherit any updated configuration or transforms sent using @set_maus_configuration@ above.
380 6 Jackson, Mike
381
In the @celeryd@ window you might see this:
382
<pre>
383
Consumer: Connection to broker lost. Trying to re-establish the connection...
384
Traceback (most recent call last):
385
...
386 7 Jackson, Mike
...
387
...
388
error: [Errno 4] Interrupted system call
389 6 Jackson, Mike
</pre>
390
This can be ignored. If in doubt you can use @get_maus_configuration@ above to ensure that the workers have the new configuration.