Project

General

Profile

Feature #1417

Feature request: Parallel running

Added by Dobbs, Adam almost 10 years ago. Updated over 9 years ago.

Status:
Open
Priority:
Normal
Category:
Code Management
Target version:
Start date:
18 February 2014
Due date:
% Done:

0%

Estimated time:
Workflow:
New Issue

Description

More commonly now people are starting to use MAUS for larger scale analysis runs, such as 10k - 100k event runs, which take a good number of hours. The architecture is built to support parallel processing, but this is not at present implemented. It would be a great help if we could exploit multiple cores with MAUS (not just on the grid batch production version).


Files

distributed_offline.py (12.5 KB) distributed_offline.py modified version of analyze_data_online for parallel offline running Rajaram, Durga, 05 March 2014 10:48
reduce_output.py (910 Bytes) reduce_output.py outputter Rajaram, Durga, 05 March 2014 10:48
sh_distributed.png (7.24 KB) sh_distributed.png #slab hits from distributed_offline Rajaram, Durga, 06 March 2014 08:31
sh_distributed-run2.png (7.08 KB) sh_distributed-run2.png #slab hits from distributed_offline -- rerun Rajaram, Durga, 06 March 2014 08:31
distributed_offline_logs.tar.gz (383 KB) distributed_offline_logs.tar.gz logs from running distributed_offline Rajaram, Durga, 06 March 2014 08:31
nspills_offline.png (5.33 KB) nspills_offline.png plot of spill-number from analyze_data_offline Rajaram, Durga, 06 March 2014 08:31
nspills_distributed.png (5.6 KB) nspills_distributed.png plot of spill-number from distributed_offline Rajaram, Durga, 06 March 2014 08:31
nspills_distributed-run2.png (5.52 KB) nspills_distributed-run2.png plot of spill-number from distributed_offline -- rerun Rajaram, Durga, 06 March 2014 08:31
sh_offline.png (7.49 KB) sh_offline.png #slab hits from analyze_data_offline Rajaram, Durga, 06 March 2014 08:31
#1

Updated by Rogers, Chris almost 10 years ago

This is supported by the existing online running implementation. It is not well documented how to do this in offline mode, and some code development could make it more straightforward. Note that there will be some fiddling with the way the online running works as part of #1312

#2

Updated by Dobbs, Adam over 9 years ago

Hi Durga, you mentioned at the CM there was a way of hacking this by spoofing MAUS into thinking it was running online, could you explain how I go about this? Thanks.

#3

Updated by Rajaram, Durga over 9 years ago

Hi Adam -- thanks for the reminder. I've created an issue #1426 for updating the documentation for online deployment.
Presumably you're interested in running simulation and/or offline reconstruction?

#4

Updated by Rogers, Chris over 9 years ago

Durga, would you like me to suggest something here? e.g. a bastardised online script?

#5

Updated by Rajaram, Durga over 9 years ago

Chris -- I modified the online script [ got rid of the django, all reducers but one, and a few other changes ].

I've been testing it against data to check speed improvements and output consistency with 'normal' offline running.
Ran ok on a few smaller runs [ 4993, 4995 ] but is dying on some larger runs [ e.g. 4964 ] -- seems memory related and not celery -- but debugging. Also giving a false negative message saying input-transform failed though there are no errors in the log.
Haven't checked with MC 'input' yet

I've attached the script

distributed_offline.py
along with an outputter
reduce_output.py # goes in bin/examples 

If you can take a look and see if I messed up or missed something, that'd be great.

With mongodb and rabbitmq running:

python distributed_offline.py -daq_data_path <path> -daq_data_file <file> 

#6

Updated by Rogers, Chris over 9 years ago

Great!

Also giving a false negative message saying input-transform failed though there are no errors in the log.

Yes, the script just looks for existence of a process, doesn't check the return code. In principle in the control room the reconstruction runs forever and it is an error if any process (e.g. reconstruction) stops running. If you run e.g. test_analyze_data_online.py it also makes the same error message when the data runs out, and this is okay.

Okay, I will have a look at your script...

#7

Updated by Rajaram, Durga over 9 years ago

Comparing the time taken by vanilla analyze_data_offline vs distributed_offline
[ on a quad core i5 3450 3.1 GHz, SL6 ]

Run 4995 -- 403 spills -- 2m12s distributed (3m22s vanilla offline)
Run 4993 -- 3110 spills -- 17m57s distributed (45m10s vanilla offline)

Bad news is for Run 4993 the outputs are not the same.
Distributed_offline seems to lose events.
e.g. spill number plot shows that the standard offline script went through 3110 spills, whereas the distributed process wrote out only 3059 spills
The effect also visible in the plot of #slab hits in tof1

Re-ran the distributed_offline script and the second time around it ended up with a different number of events in the output (2757)

Inspecting the celery log shows that some tasks failed because of a hard time limit of 10s

[2014-03-06 01:18:35,846: ERROR/MainProcess] Task mauscelery.maustasks.MausGenericTransformTask[6220e6e1-446f-4842-9cee-edd7cef05f41] raised exception: TimeLimitExceeded(10,)
Traceback (most recent call last):
  File "/home/durga/maus-merge/third_party/install/lib/python2.7/site-packages/celery-2.5.5-py2.7.egg/celery/concurrency/processes/pool.py", line 370, in _on_hard_timeout
    raise TimeLimitExceeded(hard_timeout)
TimeLimitExceeded: 10
[2014-03-06 01:18:35,938: ERROR/MainProcess] Hard time limit (10s) exceeded for mauscelery.maustasks.MausGenericTransformTask[6220e6e1-446f-4842-9cee-edd7cef05f41]

But these failures account for only 4 spills. What happened to the others?
The reduce_output.log shows that the outputter was sent a SIGINT.
Not sure who sent the signal. Maybe when one of the workers finishes [ and the other is still working], the outputter is sent an interrupt?

logs and plots attached

#8

Updated by Rogers, Chris over 9 years ago

Inspecting the celery log shows that some tasks failed because of a hard time limit of 10s

Could try CELERYD_TASK_TIME_LIMIT parameter, which I guess should be added to src/common_py/mauscelery/celeryconfig.py see e.g. http://docs.celeryproject.org/en/latest/configuration.html#celeryd-task-time-limit

The reduce_output.log shows that the outputter was sent a SIGINT.
Not sure who sent the signal. Maybe when one of the workers finishes [ and the other is still working], the outputter is sent an interrupt?

That is exactly correct. If the reconstruction dies, the outputter is supposed to finish the current data and then end. Parallel script should poll workers every 10 seconds and make a line in the terminal output like "ouputter is still working, see logs".

The relevant handling is in src/common_py/framework/merge_output.py:

  • line 343 _execute_inner_loop receives a SIGINT, sets keyboard_interrupt = True
  • inner loop keeps going until a StopIteration (raised on no more events), which is raised up to line 352; if keyboard_interrupt == True the _execute_inner_loop function exits returning False
  • execute keeps running unless _execute_inner_loop returns False, in which case the execution ends and we enter the finally statement (end_of_job, etc)

Distributed_offline seems to lose events.

My experience of setting up infrastructure for ROOT docstore shows that about 1 % of jobs get lost in passing over the socket, even when running on the same PC (localhost). I assumed Celery would have better efficiency, but maybe not? I have not got much trust in Celery, it seems quite flaky.

Ps: there is also a type_of_dataflow called multi_process that may be a better route to go down. It was in the test tests/integration/test_distributed_processing/_test_multithreaded_vs_single_threaded.py but didn't work reliably so I commented it out. This implementation uses celery for the task queue but holds reconstructed spills in memory (rather than placing them in mongodb). The problem I had was that spills were reconstructed twice or not-at-all. At least there the infrastructure is only celery, so it might be easier to handle.

A more lightweight solution would be a stitch/unstitch on disk, rather than in memory - so we run 10 jobs with different random seeds and then stitch them together at the end, without having to worry about task queues, messaging, blah blah, all of the pain that comes from multiprocessing inline. Or run a trio of jobs like
  • InputPySpillGenerator - MapPyBeamGenerator - ReducePyDoNothing - OutputCppRoot;
  • utility script to split the root file;
  • InputCppRoot - MapCppSimulation - MapPyRecon - etc - ReducePyDoNothing - OutputCppRoot
    where "utility script to split the root file" needs to be written
#9

Updated by Dobbs, Adam over 9 years ago

Deterministic MC running would be important for me, so that when I vary parameters and look at the results, I know I am comparing apples with apples. Could this be solved by considering a multi-threaded approach, instead of / in addition to full distributed processing?

Also, I believe Chris Hunt has already got a hack of the ROOT file stitching method working. Not the most elegant solution, but it works.

#10

Updated by Rajaram, Durga over 9 years ago

Ps: there is also a type_of_dataflow called multi_process that may be a better route to go down.

I started with multi_process initially but found that the output had only the header and footer and was missing actual spill data. Will look more closely. Losing events is worrisome though especially if we're looking at efficiencies and reproducibility

A more lightweight solution would be a stitch/unstitch on disk, rather than in memory - so we run 10 jobs with different random seeds and then stitch them together at the end, without having to worry about task queues, messaging, blah blah, all of the pain that comes from multiprocessing inline.

Or run a trio of jobs like
  • InputPySpillGenerator - MapPyBeamGenerator - ReducePyDoNothing - OutputCppRoot;
  • utility script to split the root file;
  • InputCppRoot - MapCppSimulation - MapPyRecon - etc - ReducePyDoNothing - OutputCppRoot
    where "utility script to split the root file" needs to be written

The memory footprint will be large, and there's no sharing of common resources [ geometry for e.g. ], but from an implementation point of view running n jobs 'grid-like' and splitting/stitching might be the most straightforward. If we want true multithreading at the event-level it'll take some work.

I see that GEANT4 has some MPI-based parallelization examples in examples/extended/parallel/
http://geant4.web.cern.ch/geant4/UserDocumentation/Doxygen/examples_doc/html/Examples_MPI.html
Interesting..

#11

Updated by Rogers, Chris over 9 years ago

Fine to spend some time trying to bludgeon the existing system. I would be very nervous to implement multithreading or anything more fancy than what we have right now. Let's say, this is a low priority task (we can probably win more with existing multiprocessing and some optimisation work, and we can't staff anything fancier - physics comes first right now).

Also available in: Atom PDF