Project

General

Profile

MAUSCelery » History » Version 27

Jackson, Mike, 15 March 2012 17:04

1 1 Jackson, Mike
h1. Celery configuration and monitoring
2
3 12 Jackson, Mike
{{>toc}}
4
5 24 Jackson, Mike
MAUS can be used with the Celery asynchronous distributed task queue (http://celeryproject.org/) to allow transform (map) steps to be executed on multiple processors in parallel.
6 1 Jackson, Mike
7 24 Jackson, Mike
For full information on using Celery, see the Celery web site and:
8 1 Jackson, Mike
9
* "Introduction":http://docs.celeryproject.org/en/latest/getting-started/introduction.html
10
* "Help resources":http://ask.github.com/celery/getting-started/resources.html
11
* "User doc":http://ask.github.com/celery/index.html
12 11 Jackson, Mike
* "Source code":https://github.com/ask/celery/tree/master/celery
13
14 24 Jackson, Mike
h2. Configure a node to run a Celery worker
15 1 Jackson, Mike
16 24 Jackson, Mike
To configure a node to run a Celery worker:
17 1 Jackson, Mike
18 24 Jackson, Mike
* Ensure MAUS is available on the node you want to use as a Celery worker.
19 27 Jackson, Mike
** *Important note* MAUS sends configuration information to Celery workers. This can include absolute paths. Therefore, when deploying workers please ensure that the *path to the MAUS deployment used by the Celery worker* is the *same* as the *path to the MAUS deployment within which you are running MAUS data analysis workflows*. This is a known issue - #918
20 24 Jackson, Mike
* Run:
21 1 Jackson, Mike
<pre>
22
$ source env.sh
23
</pre>
24
* If you have deployed RabbitMQ on a remote host,
25
** Edit @src/common_py/mauscelery/celeryconfig.py@
26
** Change
27
<pre>
28
BROKER_HOST = "localhost"
29
</pre>
30
to specify the full hostname of the host on which RabbitMQ was deployed e.g.
31
<pre>
32
BROKER_HOST = "maus.epcc.ed.ac.uk"
33
</pre>
34
35
h2. Start up a Celery worker
36
37
To start up a Celery worker, run:
38
<pre>
39
$ celeryd -l INFO -n WORKER_ID
40
</pre>
41
where @WORKER_ID@ is a unique ID for the worker. You should provide one of these as it helps when monitoring if you are using many workers.
42
43
h3. Specify the number of sub-processes
44
45 24 Jackson, Mike
The worker will set up a number of sub-processes, depending on the number of CPUs available to your node. You can explicitly set the number of sub-processes via the @-c@ flag e.g.
46 1 Jackson, Mike
<pre>
47
$ celeryd -l INFO -n WORKER_ID -c 2
48
</pre>
49
50 2 Jackson, Mike
You can specify as many sub-processes as you like but exceeding the number of CPUs available may cause performance to suffer.
51
52 1 Jackson, Mike
h3. Purge queued tasks
53
54
If any tasks are held by RabbitMQ awaiting despatch to workers then the worker, when started, will immediately start to process these. If you want to purge this queue prior to your worker starting, which can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them, then use the @--purge@ flag e.g.
55
<pre>
56
$ celeryd -l INFO -n WORKER_ID --purge
57
</pre>
58
59
See also "Purge queued tasks" below.
60
61
h3. Specify the logging level
62
63
The @-l@ flag specifies the logging level of the worker. The other options, asides from @INFO@, are @DEBUG@, @WARNING@, @ERROR@, @CRITICAL@ e.g.
64
<pre>
65
$ celeryd -l DEBUG -n WORKER_ID 
66
</pre>
67
68 7 Jackson, Mike
The default logging format is:
69
<pre>
70
[%(asctime)s: %(levelname)s/%(processName)s] %(message)s
71
</pre>
72
You can specify another format in @src/common_py/mauscelery/celeryconfig.py@ using a @CELERYD_LOG_FORMAT@ variable. See the Python @logging@ module, http://ask.github.com/celery/configuration.html#logging, for more on logging.
73
74 4 Jackson, Mike
h2. Initialise the worker to execute MAUS tasks
75
76
By default, Celery workers use the default MAUS configuration and apply the MapPyDoNothing transform. They need to be explicity configured to apply other transformations.
77
78
If you are running analyses using a client that uses Go.py then this configuration will be done by Go.py on your behalf. 
79
80 18 Jackson, Mike
If you are not using Go.py then you can do this manually (or in your own Python code) using the MAUS-specific commands described below in "Birth the worker transforms".
81 4 Jackson, Mike
82 14 Jackson, Mike
h2. Monitoring and management
83 1 Jackson, Mike
84
Celery provides a number of commands to allow inspection of Celery workers. Many of these can be invoked from the command-line, via the @celeryctl@ command, or from within Python. For full information, see http://ask.github.com/celery/userguide/monitoring.html
85
86
In the following, the names shown e.g. @worker1@, @worker2@ etc. are the worker names specified with the @-n@ flag when starting the workers using @celeryd@.
87
88 9 Jackson, Mike
In the invocations of the Celery @inspect@ command in Python, a specific Celery worker can be specified e.g.
89
<pre>
90
i = inspect("worker1")
91
</pre>
92
where the worker name is that given to the @-n@ flag when @celeryd@ is invoked to start the worker.
93
94 1 Jackson, Mike
h3. Check live workers
95
96
Check for live Celery workers that have registered with RabbitMQ and are available for use:
97
<pre>
98
$ celeryctl status
99
worker1: OK
100
worker2: OK
101
102
2 nodes online.
103
</pre>
104
105
<pre>
106
$ celeryctl inspect ping
107
<- ping
108
-> worker1: OK
109
    pong
110
-> worker2: OK
111
    pong
112
</pre>
113
114
To specify a specific worker, use @-d@ e.g.:
115
<pre>
116
$ celeryctl inspect -d worker1 ping
117
<- ping
118
-> worker1: OK
119
</pre>
120
121
From within Python, use:
122
<pre>
123
$ from celery.task.control import inspect
124
$ i = inspect()
125
$ i.ping()
126
{u'worker1': u'pong', u'worker2': u'pong'}
127
</pre>
128
129
h3. Check worker configuration
130
131
<pre>
132
$ celeryctl inspect stats
133
<- stats
134
-> worker1: OK
135
    {u'autoscaler': {},
136
     u'consumer': {u'broker': {u'connect_timeout': 4,
137
                               u'hostname': u'127.0.0.1',
138
                               u'insist': False,
139
                               u'login_method': u'AMQPLAIN',
140
                               u'port': 5672,
141
                               u'ssl': False,
142
                               u'transport': u'amqp',
143
                               u'transport_options': {},
144
                               u'userid': u'maus',
145
                               u'virtual_host': u'maushost'},
146
                   u'prefetch_count': 8},
147
     u'pool': {u'max-concurrency': 2,
148
               u'max-tasks-per-child': None,
149
               u'processes': [11991, 11992],
150
               u'put-guarded-by-semaphore': True,
151
               u'timeouts': [None, None]},
152
     u'total': {}}
153
-> worker2: OK
154
    {u'autoscaler': {},
155
     u'consumer': {u'broker': {u'connect_timeout': 4,
156
                               u'hostname': u'maus.epcc.ed.ac.uk',
157
                               u'insist': False,
158
                               u'login_method': u'AMQPLAIN',
159
                               u'port': 5672,
160
                               u'ssl': False,
161
                               u'transport': u'amqp',
162
                               u'transport_options': {},
163
                               u'userid': u'maus',
164
                               u'virtual_host': u'maushost'},
165
                   u'prefetch_count': 8},
166
     u'pool': {u'max-concurrency': 2,
167
               u'max-tasks-per-child': None,
168
               u'processes': [21964, 21965],
169
               u'put-guarded-by-semaphore': True,
170
               u'timeouts': [None, None]},
171
     u'total': {}}
172
</pre>
173
174
From within Python, use:
175
<pre>
176
$ from celery.task.control import inspect
177
$ i.stats()
178
{...}
179
</pre>
180
181
h3. Check registered tasks
182
183
Check the tasks that each worker can execute.
184
185
<pre>
186
$ celeryctl inspect registered
187
<- registered
188
-> worker1: OK
189
    * celery.backend_cleanup
190
    * celery.chord
191
    * celery.chord_unlock
192
    * celery.ping
193
    * mauscelery.maustasks.MausGenericTransformTask
194
-> worker2: OK
195
    * celery.backend_cleanup
196
    * celery.chord
197
    * celery.chord_unlock
198
    * celery.ping
199
    * mauscelery.maustasks.MausGenericTransformTask
200
</pre>
201
202
From within Python, use:
203
<pre>
204
$ from celery.task.control import inspect
205
$ i = inspect()
206
$ i.registered()
207
{u'worker1': [u'celery.backend_cleanup',
208
  u'celery.chord',
209
  u'celery.chord_unlock',
210
  u'celery.ping',
211
  u'mauscelery.maustasks.MausGenericTransformTask'],
212
 u'worker2': [u'celery.backend_cleanup',
213
  u'celery.chord',
214
  u'celery.chord_unlock',
215
  u'celery.ping',
216
  u'mauscelery.maustasks.MausGenericTransformTask']}
217
</pre>
218
219
h3. Check task states
220
221 8 Jackson, Mike
Check the tasks submitted for execution to a Celery worker by a client.
222 1 Jackson, Mike
223 8 Jackson, Mike
Check the tasks currently being executed by the worker:
224 1 Jackson, Mike
<pre>
225
$ celeryctl inspect active
226
<- active
227
-> worker1: OK
228 8 Jackson, Mike
    * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 13)', 
229
u'time_start':1327503329.679438, u'name':u'mauscelery.maustasks.MausGenericTransformTask', 
230
u'delivery_info':{u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange':u'celery'}, 
231
u'hostname': u'worker1', u'acknowledged': True, u'kwargs':u'{}', 
232
u'id': u'7222138d-bb2d-4e1b-ba70-5c0f9e90aa08', u'worker_pid':13059}
233
    * {...}
234
...
235 1 Jackson, Mike
</pre>
236 8 Jackson, Mike
Note the @worker_pid@ which specifies the process ID of the Celery sub-process executing the task.
237 1 Jackson, Mike
238 8 Jackson, Mike
Check the tasks received by the worker but awaiting execution:
239 1 Jackson, Mike
<pre>
240
$ celeryctl inspect reserved
241
<- reserved
242
-> worker1: OK
243 8 Jackson, Mike
    * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 95)', 
244
u'time_start': None, u'name': u'mauscelery.maustasks.MausGenericTransformTask',
245
u'delivery_info': {u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange': u'celery'}, 
246
u'hostname': u'worker1', u'acknowledged': False, u'kwargs': u'{}', 
247
u'id': u'ee1b3a88-58cc-4e26-b77d-4424ec9161d1', u'worker_pid': None}
248
    * {...}
249
...
250 1 Jackson, Mike
</pre>
251 8 Jackson, Mike
Note the @worker_pid@ which specifies the process ID of the Celery sub-process executing the task is currently @None@.
252 1 Jackson, Mike
253
From within Python. use:
254
<pre>
255
$ from celery.task.control import inspect
256
$ i = inspect()
257
$ i.active()
258 8 Jackson, Mike
{u'worker1': [...], ...}
259 1 Jackson, Mike
260
$ i.reserved()
261 8 Jackson, Mike
{u'worker1': [...], ...}
262 1 Jackson, Mike
</pre>
263
264 16 Jackson, Mike
h3. Get results
265 15 Jackson, Mike
266 16 Jackson, Mike
When a task is submitted to a Celery worker, a unique task ID is generated. Using this ID you can collect the results of a task, providing you have not collected them already, using @celeryctl@ e.g.
267 15 Jackson, Mike
268
<pre>
269
$ celeryctl result 95b3c56e-bea9-433d-aecd-920601c5ad20
270
{"digits": [{"tdc_counts": 1, "channel_id": {"fiber_number": 106, "tracker_numbe
271
r": 0, "type": "Tracker", "station_number": 1, "plane_number": 2}, "adc_counts":
272
...
273
</pre>
274
275
*Warning* Results can only be collected once. If you do this when running an analysis it may cause problems with your client (which may wait to receive the results that you've already collected. This command should only be used for debugging or trying to establish why things are going wrong. For example, if the Celery worker needs to be terminated and you want to get results from RabbitMQ.
276
277 1 Jackson, Mike
h3. Purge queued tasks
278
279
To purge tasks currently awaiting dispatch from RabbitMQ. This can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them.
280
281
<pre>
282
$ celeryctl purge
283
Purged 4 messages from 1 known task queue.
284
</pre>
285
286
From within Python, use:
287
<pre>
288
$ from celery.task.control import discard_all
289
$ discard_all()
290
4
291
</pre>
292
293
h3. Shut down workers
294
295
All workers can be shut down from within Python via:
296
297
<pre>
298
$ from celery.task.control import broadcast
299
$ broadcast("shutdown")
300
</pre>
301
302 3 Jackson, Mike
Each worker will complete the tasks they are currently processing before shutting down.
303 1 Jackson, Mike
304 3 Jackson, Mike
Alternatively, you can use the Linux @kill@ command e.g.:
305
<pre>
306
$ ps -a
307
12614 pts/6    00:00:02 celeryd
308
12627 pts/6    00:00:00 celeryd
309 1 Jackson, Mike
12628 pts/6    00:00:00 celeryd
310 3 Jackson, Mike
$ kill -s TERM 12614
311 2 Jackson, Mike
</pre>
312 3 Jackson, Mike
The process ID should be that of the main worker process. This will usually have the lowest process ID.
313 2 Jackson, Mike
314 16 Jackson, Mike
To kill the worker *immediately*, without waiting for currently processing tasks to complete, use:
315 2 Jackson, Mike
<pre>
316
$ kill -s KILL 12614
317 3 Jackson, Mike
</pre>
318 2 Jackson, Mike
319
To kill all @celeryd@ processes, use:
320
<pre>
321
$ ps auxww | grep celeryd | awk '{print $2}' | xargs kill -9
322 1 Jackson, Mike
</pre>
323
324 13 Jackson, Mike
h2. MAUS-specific monitoring and management
325 1 Jackson, Mike
326
These actions are supported by Celery worker's running on top of MAUS.
327
328 25 Jackson, Mike
MAUS tasks support dynamic reconfiguration of Celery workers. When executing a client that uses @Go.py@, the MAUS configuration and a specification of the client's transforms (or mappers) is broadcast to the Celery workers which automatically reconfigure themselves, create the transforms and birth them.
329
330 26 Jackson, Mike
The following custom MAUS broadcast comments are supported to allow querying and updating of the worker nodes. These are used within @Go.py@ so you won't need to worry about them. But they may prove useful for debugging or out of curiosity.
331 25 Jackson, Mike
332 17 Jackson, Mike
h3. Get worker MAUS configuration
333 1 Jackson, Mike
334 17 Jackson, Mike
Get the current MAUS configuration known to the workers, and the transforms that the workers will execute: 
335 1 Jackson, Mike
336
<pre>
337
$ from celery.task.control import broadcast
338 17 Jackson, Mike
$ broadcast("get_maus_configuration", reply=True)
339
[{u'worker1': {u'config_id': 123, u'configuration': u'{}', u'transform': u'MapPyDoNothing'}}]
340 1 Jackson, Mike
</pre>
341 3 Jackson, Mike
342 17 Jackson, Mike
where:
343 1 Jackson, Mike
344 17 Jackson, Mike
* @config_id@ - a unique ID for the current worker configuration.
345
* @configuration@ - the worker's MAUS configuration as forwarded by a client.
346
* @transform@ - the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups.
347 2 Jackson, Mike
348 18 Jackson, Mike
h3. Birth the worker transforms with a MAUS configuration
349 1 Jackson, Mike
350 17 Jackson, Mike
Update a worker with a new MAUS configuration and transforms. Existing transforms have death invoked. The new transforms are created and then birth is invoked upon them with the new configuration.
351 1 Jackson, Mike
<pre>
352
$ from celery.task.control import broadcast
353 17 Jackson, Mike
$ broadcast("birth", arguments={"configuration":"""{"TOFconversionFactor":0.01}""", \
354
    "transform":"MapPyDoNothing", 
355
    \"config_id":123}, reply=True)
356 21 Jackson, Mike
[{u'maus.epcc.ed.ac.uk': {u'status': u'ok'}}]
357 1 Jackson, Mike
</pre>
358
359 17 Jackson, Mike
where:
360 1 Jackson, Mike
361 19 Jackson, Mike
* @config_id@ - a unique ID for the worker configuration. For the update to be applied this must be different to that currently held by the workers and returned by @get_maus_configuration@ above.
362 17 Jackson, Mike
* @configuration@ - the MAUS configuration.
363
* @transform@ - the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups.
364 1 Jackson, Mike
365 22 Jackson, Mike
If using MapCppSimulation this can take a while to birth and there is a risk that workers will return their status before they're ready. So, to avoid this, run:
366
367
<pre>
368
# Get the current number of workers
369
$ from celery.task.control import inspect
370
$ i = inspect()
371
$ num_workers = len(i.active())
372
$ print num_workers
373
$ results = broadcast("birth", arguments={"transform":workers, "configuration":json_config_doc, "config_id": 1234}, \
374
  reply=True, timeout=300, limit=num_workers)
375
$ print results
376
</pre>
377
378
where
379
380
* @timeout@ is the maximum time in seconds to wait for any worker to respond.
381
* @limit@ is the number of workers to wait for replies before exiting.
382
383
This ensures that broadcast will wait up to 300s for responses from all the workers, but if all the workers reply promptly it exists there and then without waiting the full 300s.
384 1 Jackson, Mike
385
h3. Death the worker transforms
386
387 18 Jackson, Mike
Invoke death on the workers' transforms:
388 17 Jackson, Mike
<pre>
389 1 Jackson, Mike
$ from celery.task.control import broadcast
390
$ broadcast("death", reply=True)
391 21 Jackson, Mike
[{u'maus.epcc.ed.ac.uk': {u'status': u'ok'}}]
392 1 Jackson, Mike
</pre>
393
394 21 Jackson, Mike
If @death@ is called twice then this action does nothing.