Feature #1417
Feature request: Parallel running
0%
Description
More commonly now people are starting to use MAUS for larger scale analysis runs, such as 10k - 100k event runs, which take a good number of hours. The architecture is built to support parallel processing, but this is not at present implemented. It would be a great help if we could exploit multiple cores with MAUS (not just on the grid batch production version).
Files
Updated by Rogers, Chris almost 10 years ago
This is supported by the existing online running implementation. It is not well documented how to do this in offline mode, and some code development could make it more straightforward. Note that there will be some fiddling with the way the online running works as part of #1312
Updated by Dobbs, Adam over 9 years ago
Hi Durga, you mentioned at the CM there was a way of hacking this by spoofing MAUS into thinking it was running online, could you explain how I go about this? Thanks.
Updated by Rajaram, Durga over 9 years ago
Hi Adam -- thanks for the reminder. I've created an issue #1426 for updating the documentation for online deployment.
Presumably you're interested in running simulation and/or offline reconstruction?
Updated by Rogers, Chris over 9 years ago
Durga, would you like me to suggest something here? e.g. a bastardised online script?
Updated by Rajaram, Durga over 9 years ago
- File distributed_offline.py distributed_offline.py added
- File reduce_output.py reduce_output.py added
Chris -- I modified the online script [ got rid of the django, all reducers but one, and a few other changes ].
I've been testing it against data to check speed improvements and output consistency with 'normal' offline running.
Ran ok on a few smaller runs [ 4993, 4995 ] but is dying on some larger runs [ e.g. 4964 ] -- seems memory related and not celery -- but debugging. Also giving a false negative message saying input-transform failed though there are no errors in the log.
Haven't checked with MC 'input' yet
I've attached the script
distributed_offline.pyalong with an outputter
reduce_output.py # goes in bin/examples
If you can take a look and see if I messed up or missed something, that'd be great.
With mongodb and rabbitmq running:
python distributed_offline.py -daq_data_path <path> -daq_data_file <file>
Updated by Rogers, Chris over 9 years ago
Great!
Also giving a false negative message saying input-transform failed though there are no errors in the log.
Yes, the script just looks for existence of a process, doesn't check the return code. In principle in the control room the reconstruction runs forever and it is an error if any process (e.g. reconstruction) stops running. If you run e.g. test_analyze_data_online.py
it also makes the same error message when the data runs out, and this is okay.
Okay, I will have a look at your script...
Updated by Rajaram, Durga over 9 years ago
- File sh_distributed.png sh_distributed.png added
- File sh_distributed-run2.png sh_distributed-run2.png added
- File distributed_offline_logs.tar.gz distributed_offline_logs.tar.gz added
- File nspills_offline.png nspills_offline.png added
- File nspills_distributed.png nspills_distributed.png added
- File nspills_distributed-run2.png nspills_distributed-run2.png added
- File sh_offline.png sh_offline.png added
Comparing the time taken by vanilla analyze_data_offline vs distributed_offline
[ on a quad core i5 3450 3.1 GHz, SL6 ]
Run 4995 -- 403 spills -- 2m12s distributed (3m22s vanilla offline)
Run 4993 -- 3110 spills -- 17m57s distributed (45m10s vanilla offline)
Bad news is for Run 4993 the outputs are not the same.
Distributed_offline seems to lose events.
e.g. spill number plot shows that the standard offline script went through 3110 spills, whereas the distributed process wrote out only 3059 spills
The effect also visible in the plot of #slab hits in tof1
Re-ran the distributed_offline script and the second time around it ended up with a different number of events in the output (2757)
Inspecting the celery log shows that some tasks failed because of a hard time limit of 10s
[2014-03-06 01:18:35,846: ERROR/MainProcess] Task mauscelery.maustasks.MausGenericTransformTask[6220e6e1-446f-4842-9cee-edd7cef05f41] raised exception: TimeLimitExceeded(10,) Traceback (most recent call last): File "/home/durga/maus-merge/third_party/install/lib/python2.7/site-packages/celery-2.5.5-py2.7.egg/celery/concurrency/processes/pool.py", line 370, in _on_hard_timeout raise TimeLimitExceeded(hard_timeout) TimeLimitExceeded: 10 [2014-03-06 01:18:35,938: ERROR/MainProcess] Hard time limit (10s) exceeded for mauscelery.maustasks.MausGenericTransformTask[6220e6e1-446f-4842-9cee-edd7cef05f41]
But these failures account for only 4 spills. What happened to the others?
The reduce_output.log shows that the outputter was sent a SIGINT.
Not sure who sent the signal. Maybe when one of the workers finishes [ and the other is still working], the outputter is sent an interrupt?
logs and plots attached
Updated by Rogers, Chris over 9 years ago
Inspecting the celery log shows that some tasks failed because of a hard time limit of 10s
Could try CELERYD_TASK_TIME_LIMIT
parameter, which I guess should be added to src/common_py/mauscelery/celeryconfig.py
see e.g. http://docs.celeryproject.org/en/latest/configuration.html#celeryd-task-time-limit
The reduce_output.log shows that the outputter was sent a SIGINT.
Not sure who sent the signal. Maybe when one of the workers finishes [ and the other is still working], the outputter is sent an interrupt?
That is exactly correct. If the reconstruction dies, the outputter is supposed to finish the current data and then end. Parallel script should poll workers every 10 seconds and make a line in the terminal output like "ouputter is still working, see logs".
The relevant handling is in src/common_py/framework/merge_output.py
:
- line 343
_execute_inner_loop
receives a SIGINT, sets keyboard_interrupt = True - inner loop keeps going until a StopIteration (raised on no more events), which is raised up to line 352; if
keyboard_interrupt == True
the_execute_inner_loop
function exits returning False execute
keeps running unless_execute_inner_loop
returns False, in which case the execution ends and we enter thefinally
statement (end_of_job
, etc)
Distributed_offline seems to lose events.
My experience of setting up infrastructure for ROOT docstore shows that about 1 % of jobs get lost in passing over the socket, even when running on the same PC (localhost). I assumed Celery would have better efficiency, but maybe not? I have not got much trust in Celery, it seems quite flaky.
Ps: there is also a type_of_dataflow
called multi_process
that may be a better route to go down. It was in the test tests/integration/test_distributed_processing/_test_multithreaded_vs_single_threaded.py
but didn't work reliably so I commented it out. This implementation uses celery for the task queue but holds reconstructed spills in memory (rather than placing them in mongodb). The problem I had was that spills were reconstructed twice or not-at-all. At least there the infrastructure is only celery, so it might be easier to handle.
- InputPySpillGenerator - MapPyBeamGenerator - ReducePyDoNothing - OutputCppRoot;
- utility script to split the root file;
- InputCppRoot - MapCppSimulation - MapPyRecon - etc - ReducePyDoNothing - OutputCppRoot
where "utility script to split the root file" needs to be written
Updated by Dobbs, Adam over 9 years ago
Deterministic MC running would be important for me, so that when I vary parameters and look at the results, I know I am comparing apples with apples. Could this be solved by considering a multi-threaded approach, instead of / in addition to full distributed processing?
Also, I believe Chris Hunt has already got a hack of the ROOT file stitching method working. Not the most elegant solution, but it works.
Updated by Rajaram, Durga over 9 years ago
Ps: there is also a
type_of_dataflow
calledmulti_process
that may be a better route to go down.
I started with multi_process initially but found that the output had only the header and footer and was missing actual spill data. Will look more closely. Losing events is worrisome though especially if we're looking at efficiencies and reproducibility
A more lightweight solution would be a stitch/unstitch on disk, rather than in memory - so we run 10 jobs with different random seeds and then stitch them together at the end, without having to worry about task queues, messaging, blah blah, all of the pain that comes from multiprocessing inline.
Or run a trio of jobs like
- InputPySpillGenerator - MapPyBeamGenerator - ReducePyDoNothing - OutputCppRoot;
- utility script to split the root file;
- InputCppRoot - MapCppSimulation - MapPyRecon - etc - ReducePyDoNothing - OutputCppRoot
where "utility script to split the root file" needs to be written
The memory footprint will be large, and there's no sharing of common resources [ geometry for e.g. ], but from an implementation point of view running n jobs 'grid-like' and splitting/stitching might be the most straightforward. If we want true multithreading at the event-level it'll take some work.
I see that GEANT4 has some MPI-based parallelization examples in examples/extended/parallel/
http://geant4.web.cern.ch/geant4/UserDocumentation/Doxygen/examples_doc/html/Examples_MPI.html
Interesting..
Updated by Rogers, Chris over 9 years ago
Fine to spend some time trying to bludgeon the existing system. I would be very nervous to implement multithreading or anything more fancy than what we have right now. Let's say, this is a low priority task (we can probably win more with existing multiprocessing and some optimisation work, and we can't staff anything fancier - physics comes first right now).