Project

General

Profile

MAUSCelery » History » Version 30

Jackson, Mike, 15 March 2012 17:11

1 1 Jackson, Mike
h1. Celery configuration and monitoring
2
3 12 Jackson, Mike
{{>toc}}
4
5 24 Jackson, Mike
MAUS can be used with the Celery asynchronous distributed task queue (http://celeryproject.org/) to allow transform (map) steps to be executed on multiple processors in parallel.
6 1 Jackson, Mike
7 24 Jackson, Mike
For full information on using Celery, see the Celery web site and:
8 1 Jackson, Mike
9
* "Introduction":http://docs.celeryproject.org/en/latest/getting-started/introduction.html
10
* "Help resources":http://ask.github.com/celery/getting-started/resources.html
11
* "User doc":http://ask.github.com/celery/index.html
12 11 Jackson, Mike
* "Source code":https://github.com/ask/celery/tree/master/celery
13
14 24 Jackson, Mike
h2. Configure a node to run a Celery worker
15 1 Jackson, Mike
16 24 Jackson, Mike
To configure a node to run a Celery worker:
17 1 Jackson, Mike
18 24 Jackson, Mike
* Ensure MAUS is available on the node you want to use as a Celery worker.
19 27 Jackson, Mike
** *Important note* MAUS sends configuration information to Celery workers. This can include absolute paths. Therefore, when deploying workers please ensure that the *path to the MAUS deployment used by the Celery worker* is the *same* as the *path to the MAUS deployment within which you are running MAUS data analysis workflows*. This is a known issue - #918
20 24 Jackson, Mike
* Run:
21 1 Jackson, Mike
<pre>
22
$ source env.sh
23
</pre>
24
* If you have deployed RabbitMQ on a remote host,
25
** Edit @src/common_py/mauscelery/celeryconfig.py@
26
** Change
27
<pre>
28
BROKER_HOST = "localhost"
29
</pre>
30
to specify the full hostname of the host on which RabbitMQ was deployed e.g.
31
<pre>
32
BROKER_HOST = "maus.epcc.ed.ac.uk"
33
</pre>
34
35
h2. Start up a Celery worker
36
37
To start up a Celery worker, run:
38
<pre>
39
$ celeryd -l INFO -n WORKER_ID
40
</pre>
41
where @WORKER_ID@ is a unique ID for the worker. You should provide one of these as it helps when monitoring if you are using many workers.
42
43
h3. Specify the number of sub-processes
44
45 24 Jackson, Mike
The worker will set up a number of sub-processes, depending on the number of CPUs available to your node. You can explicitly set the number of sub-processes via the @-c@ flag e.g.
46 1 Jackson, Mike
<pre>
47
$ celeryd -l INFO -n WORKER_ID -c 2
48
</pre>
49
50 2 Jackson, Mike
You can specify as many sub-processes as you like but exceeding the number of CPUs available may cause performance to suffer.
51
52 1 Jackson, Mike
h3. Purge queued tasks
53
54
If any tasks are held by RabbitMQ awaiting despatch to workers then the worker, when started, will immediately start to process these. If you want to purge this queue prior to your worker starting, which can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them, then use the @--purge@ flag e.g.
55
<pre>
56
$ celeryd -l INFO -n WORKER_ID --purge
57
</pre>
58
59
See also "Purge queued tasks" below.
60
61
h3. Specify the logging level
62
63
The @-l@ flag specifies the logging level of the worker. The other options, asides from @INFO@, are @DEBUG@, @WARNING@, @ERROR@, @CRITICAL@ e.g.
64
<pre>
65
$ celeryd -l DEBUG -n WORKER_ID 
66
</pre>
67
68 7 Jackson, Mike
The default logging format is:
69
<pre>
70
[%(asctime)s: %(levelname)s/%(processName)s] %(message)s
71
</pre>
72
You can specify another format in @src/common_py/mauscelery/celeryconfig.py@ using a @CELERYD_LOG_FORMAT@ variable. See the Python @logging@ module, http://ask.github.com/celery/configuration.html#logging, for more on logging.
73
74 30 Jackson, Mike
h2. Beware resizing the Celery worker xterm
75 28 Jackson, Mike
76
Resizing the Celery worker xterm causes a Celery worker's sub-processes to die and new ones to be created. This may cause complications for any currently-running jobs. It is unclear why this arises (it is not a MAUS-specific bug).
77
78 4 Jackson, Mike
h2. Initialise the worker to execute MAUS tasks
79
80 29 Jackson, Mike
By default, Celery workers use the default MAUS configuration and apply the @MapPyDoNothing@ transform. They need to be explicity configured to apply other transformations.
81 4 Jackson, Mike
82 29 Jackson, Mike
MAUS tasks support dynamic reconfiguration of Celery workers. When executing a client that uses @Go.py@, the MAUS configuration and a specification of the client's transforms (or mappers) is broadcast to the Celery workers which automatically reconfigure themselves, create the transforms and birth them.
83 4 Jackson, Mike
84 29 Jackson, Mike
To support this, Celery workers support custom MAUS broadcast comments to allow querying and updating of the worker nodes. These are used within @Go.py@ so you won't need to worry about them. But they may prove useful for debugging or out of curiosity. If so, then please see "MAUS-specific monitoring and management" below.
85 4 Jackson, Mike
86 14 Jackson, Mike
h2. Monitoring and management
87 1 Jackson, Mike
88
Celery provides a number of commands to allow inspection of Celery workers. Many of these can be invoked from the command-line, via the @celeryctl@ command, or from within Python. For full information, see http://ask.github.com/celery/userguide/monitoring.html
89
90
In the following, the names shown e.g. @worker1@, @worker2@ etc. are the worker names specified with the @-n@ flag when starting the workers using @celeryd@.
91
92 9 Jackson, Mike
In the invocations of the Celery @inspect@ command in Python, a specific Celery worker can be specified e.g.
93
<pre>
94
i = inspect("worker1")
95
</pre>
96
where the worker name is that given to the @-n@ flag when @celeryd@ is invoked to start the worker.
97
98 1 Jackson, Mike
h3. Check live workers
99
100
Check for live Celery workers that have registered with RabbitMQ and are available for use:
101
<pre>
102
$ celeryctl status
103
worker1: OK
104
worker2: OK
105
106
2 nodes online.
107
</pre>
108
109
<pre>
110
$ celeryctl inspect ping
111
<- ping
112
-> worker1: OK
113
    pong
114
-> worker2: OK
115
    pong
116
</pre>
117
118
To specify a specific worker, use @-d@ e.g.:
119
<pre>
120
$ celeryctl inspect -d worker1 ping
121
<- ping
122
-> worker1: OK
123
</pre>
124
125
From within Python, use:
126
<pre>
127
$ from celery.task.control import inspect
128
$ i = inspect()
129
$ i.ping()
130
{u'worker1': u'pong', u'worker2': u'pong'}
131
</pre>
132
133
h3. Check worker configuration
134
135
<pre>
136
$ celeryctl inspect stats
137
<- stats
138
-> worker1: OK
139
    {u'autoscaler': {},
140
     u'consumer': {u'broker': {u'connect_timeout': 4,
141
                               u'hostname': u'127.0.0.1',
142
                               u'insist': False,
143
                               u'login_method': u'AMQPLAIN',
144
                               u'port': 5672,
145
                               u'ssl': False,
146
                               u'transport': u'amqp',
147
                               u'transport_options': {},
148
                               u'userid': u'maus',
149
                               u'virtual_host': u'maushost'},
150
                   u'prefetch_count': 8},
151
     u'pool': {u'max-concurrency': 2,
152
               u'max-tasks-per-child': None,
153
               u'processes': [11991, 11992],
154
               u'put-guarded-by-semaphore': True,
155
               u'timeouts': [None, None]},
156
     u'total': {}}
157
-> worker2: OK
158
    {u'autoscaler': {},
159
     u'consumer': {u'broker': {u'connect_timeout': 4,
160
                               u'hostname': u'maus.epcc.ed.ac.uk',
161
                               u'insist': False,
162
                               u'login_method': u'AMQPLAIN',
163
                               u'port': 5672,
164
                               u'ssl': False,
165
                               u'transport': u'amqp',
166
                               u'transport_options': {},
167
                               u'userid': u'maus',
168
                               u'virtual_host': u'maushost'},
169
                   u'prefetch_count': 8},
170
     u'pool': {u'max-concurrency': 2,
171
               u'max-tasks-per-child': None,
172
               u'processes': [21964, 21965],
173
               u'put-guarded-by-semaphore': True,
174
               u'timeouts': [None, None]},
175
     u'total': {}}
176
</pre>
177
178
From within Python, use:
179
<pre>
180
$ from celery.task.control import inspect
181
$ i.stats()
182
{...}
183
</pre>
184
185
h3. Check registered tasks
186
187
Check the tasks that each worker can execute.
188
189
<pre>
190
$ celeryctl inspect registered
191
<- registered
192
-> worker1: OK
193
    * celery.backend_cleanup
194
    * celery.chord
195
    * celery.chord_unlock
196
    * celery.ping
197
    * mauscelery.maustasks.MausGenericTransformTask
198
-> worker2: OK
199
    * celery.backend_cleanup
200
    * celery.chord
201
    * celery.chord_unlock
202
    * celery.ping
203
    * mauscelery.maustasks.MausGenericTransformTask
204
</pre>
205
206
From within Python, use:
207
<pre>
208
$ from celery.task.control import inspect
209
$ i = inspect()
210
$ i.registered()
211
{u'worker1': [u'celery.backend_cleanup',
212
  u'celery.chord',
213
  u'celery.chord_unlock',
214
  u'celery.ping',
215
  u'mauscelery.maustasks.MausGenericTransformTask'],
216
 u'worker2': [u'celery.backend_cleanup',
217
  u'celery.chord',
218
  u'celery.chord_unlock',
219
  u'celery.ping',
220
  u'mauscelery.maustasks.MausGenericTransformTask']}
221
</pre>
222
223
h3. Check task states
224
225 8 Jackson, Mike
Check the tasks submitted for execution to a Celery worker by a client.
226 1 Jackson, Mike
227 8 Jackson, Mike
Check the tasks currently being executed by the worker:
228 1 Jackson, Mike
<pre>
229
$ celeryctl inspect active
230
<- active
231
-> worker1: OK
232 8 Jackson, Mike
    * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 13)', 
233
u'time_start':1327503329.679438, u'name':u'mauscelery.maustasks.MausGenericTransformTask', 
234
u'delivery_info':{u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange':u'celery'}, 
235
u'hostname': u'worker1', u'acknowledged': True, u'kwargs':u'{}', 
236
u'id': u'7222138d-bb2d-4e1b-ba70-5c0f9e90aa08', u'worker_pid':13059}
237
    * {...}
238
...
239 1 Jackson, Mike
</pre>
240 8 Jackson, Mike
Note the @worker_pid@ which specifies the process ID of the Celery sub-process executing the task.
241 1 Jackson, Mike
242 8 Jackson, Mike
Check the tasks received by the worker but awaiting execution:
243 1 Jackson, Mike
<pre>
244
$ celeryctl inspect reserved
245
<- reserved
246
-> worker1: OK
247 8 Jackson, Mike
    * {u'args': u'(\'{MAUS_SPILL_DOCUMENT}\', \'maus.epcc.ed.ac.uk (13067)\', 95)', 
248
u'time_start': None, u'name': u'mauscelery.maustasks.MausGenericTransformTask',
249
u'delivery_info': {u'consumer_tag': u'2', u'routing_key': u'celery', u'exchange': u'celery'}, 
250
u'hostname': u'worker1', u'acknowledged': False, u'kwargs': u'{}', 
251
u'id': u'ee1b3a88-58cc-4e26-b77d-4424ec9161d1', u'worker_pid': None}
252
    * {...}
253
...
254 1 Jackson, Mike
</pre>
255 8 Jackson, Mike
Note the @worker_pid@ which specifies the process ID of the Celery sub-process executing the task is currently @None@.
256 1 Jackson, Mike
257
From within Python. use:
258
<pre>
259
$ from celery.task.control import inspect
260
$ i = inspect()
261
$ i.active()
262 8 Jackson, Mike
{u'worker1': [...], ...}
263 1 Jackson, Mike
264
$ i.reserved()
265 8 Jackson, Mike
{u'worker1': [...], ...}
266 1 Jackson, Mike
</pre>
267
268 16 Jackson, Mike
h3. Get results
269 15 Jackson, Mike
270 16 Jackson, Mike
When a task is submitted to a Celery worker, a unique task ID is generated. Using this ID you can collect the results of a task, providing you have not collected them already, using @celeryctl@ e.g.
271 15 Jackson, Mike
272
<pre>
273
$ celeryctl result 95b3c56e-bea9-433d-aecd-920601c5ad20
274
{"digits": [{"tdc_counts": 1, "channel_id": {"fiber_number": 106, "tracker_numbe
275
r": 0, "type": "Tracker", "station_number": 1, "plane_number": 2}, "adc_counts":
276
...
277
</pre>
278
279
*Warning* Results can only be collected once. If you do this when running an analysis it may cause problems with your client (which may wait to receive the results that you've already collected. This command should only be used for debugging or trying to establish why things are going wrong. For example, if the Celery worker needs to be terminated and you want to get results from RabbitMQ.
280
281 1 Jackson, Mike
h3. Purge queued tasks
282
283
To purge tasks currently awaiting dispatch from RabbitMQ. This can be useful if one or more workers have ran into problems and you don't want the backlog of pending tasks to be processed after you restart them.
284
285
<pre>
286
$ celeryctl purge
287
Purged 4 messages from 1 known task queue.
288
</pre>
289
290
From within Python, use:
291
<pre>
292
$ from celery.task.control import discard_all
293
$ discard_all()
294
4
295
</pre>
296
297
h3. Shut down workers
298
299
All workers can be shut down from within Python via:
300
301
<pre>
302
$ from celery.task.control import broadcast
303
$ broadcast("shutdown")
304
</pre>
305
306 3 Jackson, Mike
Each worker will complete the tasks they are currently processing before shutting down.
307 1 Jackson, Mike
308 3 Jackson, Mike
Alternatively, you can use the Linux @kill@ command e.g.:
309
<pre>
310
$ ps -a
311
12614 pts/6    00:00:02 celeryd
312
12627 pts/6    00:00:00 celeryd
313 1 Jackson, Mike
12628 pts/6    00:00:00 celeryd
314 3 Jackson, Mike
$ kill -s TERM 12614
315 2 Jackson, Mike
</pre>
316 3 Jackson, Mike
The process ID should be that of the main worker process. This will usually have the lowest process ID.
317 2 Jackson, Mike
318 16 Jackson, Mike
To kill the worker *immediately*, without waiting for currently processing tasks to complete, use:
319 2 Jackson, Mike
<pre>
320
$ kill -s KILL 12614
321 3 Jackson, Mike
</pre>
322 2 Jackson, Mike
323
To kill all @celeryd@ processes, use:
324
<pre>
325
$ ps auxww | grep celeryd | awk '{print $2}' | xargs kill -9
326 1 Jackson, Mike
</pre>
327
328 25 Jackson, Mike
h2. MAUS-specific monitoring and management
329
330 26 Jackson, Mike
These actions are supported by Celery worker's running on top of MAUS.
331 25 Jackson, Mike
332 17 Jackson, Mike
h3. Get worker MAUS configuration
333 1 Jackson, Mike
334 17 Jackson, Mike
Get the current MAUS configuration known to the workers, and the transforms that the workers will execute: 
335 1 Jackson, Mike
336
<pre>
337
$ from celery.task.control import broadcast
338 17 Jackson, Mike
$ broadcast("get_maus_configuration", reply=True)
339
[{u'worker1': {u'config_id': 123, u'configuration': u'{}', u'transform': u'MapPyDoNothing'}}]
340 1 Jackson, Mike
</pre>
341 3 Jackson, Mike
342 17 Jackson, Mike
where:
343 1 Jackson, Mike
344 17 Jackson, Mike
* @config_id@ - a unique ID for the current worker configuration.
345
* @configuration@ - the worker's MAUS configuration as forwarded by a client.
346
* @transform@ - the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups.
347 2 Jackson, Mike
348 18 Jackson, Mike
h3. Birth the worker transforms with a MAUS configuration
349 1 Jackson, Mike
350 17 Jackson, Mike
Update a worker with a new MAUS configuration and transforms. Existing transforms have death invoked. The new transforms are created and then birth is invoked upon them with the new configuration.
351 1 Jackson, Mike
<pre>
352
$ from celery.task.control import broadcast
353 17 Jackson, Mike
$ broadcast("birth", arguments={"configuration":"""{"TOFconversionFactor":0.01}""", \
354
    "transform":"MapPyDoNothing", 
355
    \"config_id":123}, reply=True)
356 21 Jackson, Mike
[{u'maus.epcc.ed.ac.uk': {u'status': u'ok'}}]
357 1 Jackson, Mike
</pre>
358
359 17 Jackson, Mike
where:
360 1 Jackson, Mike
361 19 Jackson, Mike
* @config_id@ - a unique ID for the worker configuration. For the update to be applied this must be different to that currently held by the workers and returned by @get_maus_configuration@ above.
362 17 Jackson, Mike
* @configuration@ - the MAUS configuration.
363
* @transform@ - the transforms that the worker will apply to a spill. A list represents a MapPyGroup and nested lists represent nested MapPyGroups.
364 1 Jackson, Mike
365 22 Jackson, Mike
If using MapCppSimulation this can take a while to birth and there is a risk that workers will return their status before they're ready. So, to avoid this, run:
366
367
<pre>
368
# Get the current number of workers
369
$ from celery.task.control import inspect
370
$ i = inspect()
371
$ num_workers = len(i.active())
372
$ print num_workers
373
$ results = broadcast("birth", arguments={"transform":workers, "configuration":json_config_doc, "config_id": 1234}, \
374
  reply=True, timeout=300, limit=num_workers)
375
$ print results
376
</pre>
377
378
where
379
380
* @timeout@ is the maximum time in seconds to wait for any worker to respond.
381
* @limit@ is the number of workers to wait for replies before exiting.
382
383
This ensures that broadcast will wait up to 300s for responses from all the workers, but if all the workers reply promptly it exists there and then without waiting the full 300s.
384 1 Jackson, Mike
385
h3. Death the worker transforms
386
387 18 Jackson, Mike
Invoke death on the workers' transforms:
388 17 Jackson, Mike
<pre>
389 1 Jackson, Mike
$ from celery.task.control import broadcast
390
$ broadcast("death", reply=True)
391 21 Jackson, Mike
[{u'maus.epcc.ed.ac.uk': {u'status': u'ok'}}]
392 1 Jackson, Mike
</pre>
393
394 21 Jackson, Mike
If @death@ is called twice then this action does nothing.