Project

General

Profile

MAUSCelery » History » Version 25

Jackson, Mike, 15 March 2012 13:45

1 1 Jackson, Mike
h1. Celery configuration and monitoring
2
3 12 Jackson, Mike
{{>toc}}
4
5 24 Jackson, Mike
MAUS can be used with the Celery asynchronous distributed task queue (http://celeryproject.org/) to allow transform (map) steps to be executed on multiple processors in parallel.
6 1 Jackson, Mike
7 24 Jackson, Mike
For full information on using Celery, see the Celery web site and:
8 1 Jackson, Mike
9
* "Introduction":http://docs.celeryproject.org/en/latest/getting-started/introduction.html
10
* "Help resources":http://ask.github.com/celery/getting-started/resources.html
11
* "User doc":http://ask.github.com/celery/index.html
12 11 Jackson, Mike
* "Source code":https://github.com/ask/celery/tree/master/celery
13
14 24 Jackson, Mike
h2. Configure a node to run a Celery worker
15 1 Jackson, Mike
16 24 Jackson, Mike
To configure a node to run a Celery worker:
17 1 Jackson, Mike
18 24 Jackson, Mike
* Ensure MAUS is available on the node you want to use as a Celery worker.
19
* Run:
20 1 Jackson, Mike
<pre>
21
$ source env.sh
22
</pre>
23
* If you have deployed RabbitMQ on a remote host,
24
** Edit @src/common_py/mauscelery/celeryconfig.py@
25
** Change
26
<pre>
27
BROKER_HOST = "localhost"
28
</pre>
29
to specify the full hostname of the host on which RabbitMQ was deployed e.g.
30
<pre>
31
BROKER_HOST = "maus.epcc.ed.ac.uk"
32
</pre>
33
34
h2. Start up a Celery worker
35
36
To start up a Celery worker, run:
37
<pre>
38
$ celeryd -l INFO -n WORKER_ID
39
</pre>
40
where @WORKER_ID@ is a unique ID for the worker. You should provide one of these as it helps when monitoring if you are using many workers.
41
42
h3. Specify the number of sub-processes
43
44 24 Jackson, Mike
The worker will set up a number of sub-processes, depending on the number of CPUs available to your node. You can explicitly set the number of sub-processes via the @-c@ flag e.g.
45 1 Jackson, Mike
<pre>
46
$ celeryd -l INFO -n WORKER_ID -c 2
47
</pre>
48
49 2 Jackson, Mike
You can specify as many sub-processes as you like but exceeding the number of CPUs available may cause performance to suffer.
50
51 1 Jackson, Mike
h3. Purge queued tasks
52
53
If any tasks are held by RabbitMQ awaiting despatch to workers then the worker, when started, will immediately start to process these. If you want to purge this queue prior to your worker starting, which can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them, then use the @--purge@ flag e.g.
54
<pre>
55
$ celeryd -l INFO -n WORKER_ID --purge
56
</pre>
57
58
See also "Purge queued tasks" below.
59
60
h3. Specify the logging level
61
62
The @-l@ flag specifies the logging level of the worker. The other options, asides from @INFO@, are @DEBUG@, @WARNING@, @ERROR@, @CRITICAL@ e.g.
63
<pre>
64
$ celeryd -l DEBUG -n WORKER_ID 
65
</pre>
66
67 7 Jackson, Mike
The default logging format is:
68
<pre>
69
[%(asctime)s: %(levelname)s/%(processName)s] %(message)s
70
</pre>
71
You can specify another format in @src/common_py/mauscelery/celeryconfig.py@ using a @CELERYD_LOG_FORMAT@ variable. See the Python @logging@ module, http://ask.github.com/celery/configuration.html#logging, for more on logging.
72
73 4 Jackson, Mike
h2. Initialise the worker to execute MAUS tasks
74
75
By default, Celery workers use the default MAUS configuration and apply the MapPyDoNothing transform. They need to be explicity configured to apply other transformations.
76
77
If you are running analyses using a client that uses Go.py then this configuration will be done by Go.py on your behalf. 
78
79 18 Jackson, Mike
If you are not using Go.py then you can do this manually (or in your own Python code) using the MAUS-specific commands described below in "Birth the worker transforms".
80 4 Jackson, Mike
81 14 Jackson, Mike
h2. Monitoring and management
82 1 Jackson, Mike
83
Celery provides a number of commands to allow inspection of Celery workers. Many of these can be invoked from the command-line, via the @celeryctl@ command, or from within Python. For full information, see http://ask.github.com/celery/userguide/monitoring.html
84
85
In the following, the names shown e.g. @worker1@, @worker2@ etc. are the worker names specified with the @-n@ flag when starting the workers using @celeryd@.
86
87 9 Jackson, Mike
In the invocations of the Celery @inspect@ command in Python, a specific Celery worker can be specified e.g.
88
<pre>
89
i = inspect("worker1")
90
</pre>
91
where the worker name is that given to the @-n@ flag when @celeryd@ is invoked to start the worker.
92
93 1 Jackson, Mike
h3. Check live workers
94
95
Check for live Celery workers that have registered with RabbitMQ and are available for use:
96
<pre>
97
$ celeryctl status
98
worker1: OK
99
worker2: OK
100
101
2 nodes online.
102
</pre>
103
104
<pre>
105
$ celeryctl inspect ping
106
<- ping
107
-> worker1: OK
108
    pong
109
-> worker2: OK
110
    pong
111
</pre>
112
113
To specify a specific worker, use @-d@ e.g.:
114
<pre>
115
$ celeryctl inspect -d worker1 ping
116
<- ping
117
-> worker1: OK
118
</pre>
119
120
From within Python, use:
121
<pre>
122
$ from celery.task.control import inspect
123
$ i = inspect()
124
$ i.ping()
125
{u'worker1': u'pong', u'worker2': u'pong'}
126
</pre>
127
128
h3. Check worker configuration
129
130
<pre>
131
$ celeryctl inspect stats
132
<- stats
133
-> worker1: OK
134
    {u'autoscaler': {},
135
     u'consumer': {u'broker': {u'connect_timeout': 4,
136
                               u'hostname': u'127.0.0.1',
137
                               u'insist': False,
138
                               u'login_method': u'AMQPLAIN',
139
                               u'port': 5672,
140
                               u'ssl': False,
141
                               u'transport': u'amqp',
142
                               u'transport_options': {},
143
                               u'userid': u'maus',
144
                               u'virtual_host': u'maushost'},
145
                   u'prefetch_count': 8},
146
     u'pool': {u'max-concurrency': 2,
147
               u'max-tasks-per-child': None,
148
               u'processes': [11991, 11992],
149
               u'put-guarded-by-semaphore': True,
150
               u'timeouts': [None, None]},
151
     u'total': {}}
152
-> worker2: OK
153
    {u'autoscaler': {},
154
     u'consumer': {u'broker': {u'connect_timeout': 4,
155
                               u'hostname': u'maus.epcc.ed.ac.uk',
156
                               u'insist': False,
157
                               u'login_method': u'AMQPLAIN',
158
                               u'port': 5672,
159
                               u'ssl': False,
160
                               u'transport': u'amqp',
161
                               u'transport_options': {},
162
                               u'userid': u'maus',
163
                               u'virtual_host': u'maushost'},
164
                   u'prefetch_count': 8},
165
     u'pool': {u'max-concurrency': 2,
166
               u'max-tasks-per-child': None,
167
               u'processes': [21964, 21965],
168
               u'put-guarded-by-semaphore': True,
169
               u'timeouts': [None, None]},
170
     u'total': {}}
171
</pre>
172
173
From within Python, use:
174
<pre>
175
$ from celery.task.control import inspect
176
$ i.stats()
177
{...}
178
</pre>
179
180
h3. Check registered tasks
181
182
Check the tasks that each worker can execute.
183
184
<pre>
185
$ celeryctl inspect registered
186
<- registered
187
-> worker1: OK
188
    * celery.backend_cleanup
189
    * celery.chord
190
    * celery.chord_unlock
191
    * celery.ping
192
    * mauscelery.maustasks.MausGenericTransformTask
193
-> worker2: OK
194
    * celery.backend_cleanup
195
    * celery.chord
196
    * celery.chord_unlock
197
    * celery.ping
198
    * mauscelery.maustasks.MausGenericTransformTask
199
</pre>
200
201
From within Python, use:
202
<pre>
203
$ from celery.task.control import inspect
204
$ i = inspect()
205
$ i.registered()
206
{u'worker1': [u'celery.backend_cleanup',
207
  u'celery.chord',
208
  u'celery.chord_unlock',
209
  u'celery.ping',
210
  u'mauscelery.maustasks.MausGenericTransformTask'],
211
 u'worker2': [u'celery.backend_cleanup',
212
  u'celery.chord',
213
  u'celery.chord_unlock',
214
  u'celery.ping',
215
  u'mauscelery.maustasks.MausGenericTransformTask']}
216
</pre>
217
218
h3. Check task states
219
220 8 Jackson, Mike
Check the tasks submitted for execution to a Celery worker by a client.
221 1 Jackson, Mike
222 8 Jackson, Mike
Check the tasks currently being executed by the worker:
223 1 Jackson, Mike
<pre>
224
$ celeryctl inspect active
225
<- active
226
-> worker1: OK
227 8 Jackson, Mike
    * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 13)', 
228
u'time_start':1327503329.679438, u'name':u'mauscelery.maustasks.MausGenericTransformTask', 
229
u'delivery_info':{u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange':u'celery'}, 
230
u'hostname': u'worker1', u'acknowledged': True, u'kwargs':u'{}', 
231
u'id': u'7222138d-bb2d-4e1b-ba70-5c0f9e90aa08', u'worker_pid':13059}
232
    * {...}
233
...
234 1 Jackson, Mike
</pre>
235 8 Jackson, Mike
Note the @worker_pid@ which specifies the process ID of the Celery sub-process executing the task.
236 1 Jackson, Mike
237 8 Jackson, Mike
Check the tasks received by the worker but awaiting execution:
238 1 Jackson, Mike
<pre>
239
$ celeryctl inspect reserved
240
<- reserved
241
-> worker1: OK
242 8 Jackson, Mike
    * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 95)', 
243
u'time_start': None, u'name': u'mauscelery.maustasks.MausGenericTransformTask',
244
u'delivery_info': {u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange': u'celery'}, 
245
u'hostname': u'worker1', u'acknowledged': False, u'kwargs': u'{}', 
246
u'id': u'ee1b3a88-58cc-4e26-b77d-4424ec9161d1', u'worker_pid': None}
247
    * {...}
248
...
249 1 Jackson, Mike
</pre>
250 8 Jackson, Mike
Note the @worker_pid@ which specifies the process ID of the Celery sub-process executing the task is currently @None@.
251 1 Jackson, Mike
252
From within Python. use:
253
<pre>
254
$ from celery.task.control import inspect
255
$ i = inspect()
256
$ i.active()
257 8 Jackson, Mike
{u'worker1': [...], ...}
258 1 Jackson, Mike
259
$ i.reserved()
260 8 Jackson, Mike
{u'worker1': [...], ...}
261 1 Jackson, Mike
</pre>
262
263 16 Jackson, Mike
h3. Get results
264 15 Jackson, Mike
265 16 Jackson, Mike
When a task is submitted to a Celery worker, a unique task ID is generated. Using this ID you can collect the results of a task, providing you have not collected them already, using @celeryctl@ e.g.
266 15 Jackson, Mike
267
<pre>
268
$ celeryctl result 95b3c56e-bea9-433d-aecd-920601c5ad20
269
{"digits": [{"tdc_counts": 1, "channel_id": {"fiber_number": 106, "tracker_numbe
270
r": 0, "type": "Tracker", "station_number": 1, "plane_number": 2}, "adc_counts":
271
...
272
</pre>
273
274
*Warning* Results can only be collected once. If you do this when running an analysis it may cause problems with your client (which may wait to receive the results that you've already collected. This command should only be used for debugging or trying to establish why things are going wrong. For example, if the Celery worker needs to be terminated and you want to get results from RabbitMQ.
275
276 1 Jackson, Mike
h3. Purge queued tasks
277
278
To purge tasks currently awaiting dispatch from RabbitMQ. This can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them.
279
280
<pre>
281
$ celeryctl purge
282
Purged 4 messages from 1 known task queue.
283
</pre>
284
285
From within Python, use:
286
<pre>
287
$ from celery.task.control import discard_all
288
$ discard_all()
289
4
290
</pre>
291
292
h3. Shut down workers
293
294
All workers can be shut down from within Python via:
295
296
<pre>
297
$ from celery.task.control import broadcast
298
$ broadcast("shutdown")
299
</pre>
300
301 3 Jackson, Mike
Each worker will complete the tasks they are currently processing before shutting down.
302 1 Jackson, Mike
303 3 Jackson, Mike
Alternatively, you can use the Linux @kill@ command e.g.:
304
<pre>
305
$ ps -a
306
12614 pts/6    00:00:02 celeryd
307
12627 pts/6    00:00:00 celeryd
308 1 Jackson, Mike
12628 pts/6    00:00:00 celeryd
309 3 Jackson, Mike
$ kill -s TERM 12614
310 2 Jackson, Mike
</pre>
311 3 Jackson, Mike
The process ID should be that of the main worker process. This will usually have the lowest process ID.
312 2 Jackson, Mike
313 16 Jackson, Mike
To kill the worker *immediately*, without waiting for currently processing tasks to complete, use:
314 2 Jackson, Mike
<pre>
315
$ kill -s KILL 12614
316 3 Jackson, Mike
</pre>
317 2 Jackson, Mike
318
To kill all @celeryd@ processes, use:
319
<pre>
320
$ ps auxww | grep celeryd | awk '{print $2}' | xargs kill -9
321 1 Jackson, Mike
</pre>
322
323 13 Jackson, Mike
h2. MAUS-specific monitoring and management
324 1 Jackson, Mike
325
These actions are supported by Celery worker's running on top of MAUS.
326
327 25 Jackson, Mike
MAUS tasks support dynamic reconfiguration of Celery workers. When executing a client that uses @Go.py@, the MAUS configuration and a specification of the client's transforms (or mappers) is broadcast to the Celery workers which automatically reconfigure themselves, create the transforms and birth them.
328
329
The following custom MAUS broadcast comments are supported to allow querying and updating of the worker nodes. These are used within @Go.py@ so you won't need to worry about them. But they may prove useful for debugging or out of curiosity. As a pre-requisite you must run within Python:
330
331 17 Jackson, Mike
h3. Get worker MAUS configuration
332 1 Jackson, Mike
333 17 Jackson, Mike
Get the current MAUS configuration known to the workers, and the transforms that the workers will execute: 
334 1 Jackson, Mike
335
<pre>
336
$ from celery.task.control import broadcast
337 17 Jackson, Mike
$ broadcast("get_maus_configuration", reply=True)
338
[{u'worker1': {u'config_id': 123, u'configuration': u'{}', u'transform': u'MapPyDoNothing'}}]
339 1 Jackson, Mike
</pre>
340 3 Jackson, Mike
341 17 Jackson, Mike
where:
342 1 Jackson, Mike
343 17 Jackson, Mike
* @config_id@ - a unique ID for the current worker configuration.
344
* @configuration@ - the worker's MAUS configuration as forwarded by a client.
345
* @transform@ - the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups.
346 2 Jackson, Mike
347 18 Jackson, Mike
h3. Birth the worker transforms with a MAUS configuration
348 1 Jackson, Mike
349 17 Jackson, Mike
Update a worker with a new MAUS configuration and transforms. Existing transforms have death invoked. The new transforms are created and then birth is invoked upon them with the new configuration.
350 1 Jackson, Mike
<pre>
351
$ from celery.task.control import broadcast
352 17 Jackson, Mike
$ broadcast("birth", arguments={"configuration":"""{"TOFconversionFactor":0.01}""", \
353
    "transform":"MapPyDoNothing", 
354
    \"config_id":123}, reply=True)
355 21 Jackson, Mike
[{u'maus.epcc.ed.ac.uk': {u'status': u'ok'}}]
356 1 Jackson, Mike
</pre>
357
358 17 Jackson, Mike
where:
359 1 Jackson, Mike
360 19 Jackson, Mike
* @config_id@ - a unique ID for the worker configuration. For the update to be applied this must be different to that currently held by the workers and returned by @get_maus_configuration@ above.
361 17 Jackson, Mike
* @configuration@ - the MAUS configuration.
362
* @transform@ - the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups.
363 1 Jackson, Mike
364 22 Jackson, Mike
If using MapCppSimulation this can take a while to birth and there is a risk that workers will return their status before they're ready. So, to avoid this, run:
365
366
<pre>
367
# Get the current number of workers
368
$ from celery.task.control import inspect
369
$ i = inspect()
370
$ num_workers = len(i.active())
371
$ print num_workers
372
$ results = broadcast("birth", arguments={"transform":workers, "configuration":json_config_doc, "config_id": 1234}, \
373
  reply=True, timeout=300, limit=num_workers)
374
$ print results
375
</pre>
376
377
where
378
379
* @timeout@ is the maximum time in seconds to wait for any worker to respond.
380
* @limit@ is the number of workers to wait for replies before exiting.
381
382
This ensures that broadcast will wait up to 300s for responses from all the workers, but if all the workers reply promptly it exists there and then without waiting the full 300s.
383 1 Jackson, Mike
384
h3. Death the worker transforms
385
386 18 Jackson, Mike
Invoke death on the workers' transforms:
387 17 Jackson, Mike
<pre>
388 1 Jackson, Mike
$ from celery.task.control import broadcast
389
$ broadcast("death", reply=True)
390 21 Jackson, Mike
[{u'maus.epcc.ed.ac.uk': {u'status': u'ok'}}]
391 1 Jackson, Mike
</pre>
392
393 21 Jackson, Mike
If @death@ is called twice then this action does nothing.