Online reconstruction - original design (14/09/11)

Original online reconstruction design notes for the Software Sustainability Institute/MAUS collaboration (#691)


  • Online operation of software / online data quality / online reconstruction.
  • Check that the detectors can be successfully reconstructed. This verifies integrity of data and is used to check satisfactory operation of the hardware.
  • Check that the accelerator can be successfully operated. Verifies integrity of the accelerator hardware.
  • Real-time monitoring is needed to detect failures immediately, not a week later during an off-line analysis
  • May want to tweak the hardware during execution.

Processing live data feeds

This is not a direct development task but does motivate requirements.

  • MAUS InputCppData worker: Input: Binary or JSON file. Output: JSON document.
  • InputCppData uses an unpacker (by Yordan) which uses CERN's ALICE DAQ.
  • Unpacker can read from file or socket.
  • MICE will output to a socket. This only works in the control room.
  • It would be useful if InputCppData took a compiler flag, or configuration parameter to determine whether to read from a file or a socker. Or, a file specification input determined which to use.
  • MICE will produce a JSON input exactly every second onto the socket which corresponds to the insertion of titanium rod into the proton beam which then creates the spill event.


  • X = data[]
  • Y = map[ProcessMap, X)
  • Z = reduce(MakePlot, Y)

Want to process data as it arrives as a "window" e.g.

  • t=30, process X[0:30]
  • t=60, process X[0:60]
  • etc.
  • Birth is deterministic - different workers run it and get into same state.
  • Data into process() is time independant - doesn't matter if spill 1 happens befre/after spill 2.
  • Death is called.


  • MICE will produce a JSON input exactly every second onto the socket.
  • JSON input is processed by input worker and unpacked into a JSON document.

Distributed map jobs (#704)

  • On input of JSON document, want to spawn distributed map job. One map group job (pipeline of map workers) spawned per second corresponding to their arrival from the input. The same map group job is run in different threads/machines.
    • No need at this time for data streaming. That is, different map workers in the same worker working on different JSON documents concurrently.
  • Use Celery asynchronous distributed task queue in Python,
    • Invoking MapGroup "birth" function during Celery "import modules" step will save development effort.
  • Input goes into the map group.
  • JSON document output is deposited in data store.
  • Useful if map worker instances (and map group jobs) had unique IDs, for logging and debugging purposes.
  • Initial prototype could use a mock wrapper round InputData to sleep then emit a JSON document every second. Could read JSON from directory or do a version that creates random JSON documents.
  • Reconfiguring will unlikly ever happen mid-run (ie. within an hour or so of continous data taking). However, people may restart it every few hours if they want to add new components.
  • Celery workers only changed weekly/monthly/yearly. Concern if one worker is missed or not updated, what then?
  • OK to drop a few events but concern is if a corrupt doc crashes a worker.

Data store (#705)

  • At present an in-memory buffer is used to cache data - see temp_file in src/common_py/
  • Use CouchDB high-speed document-oriented database,, to support multiple I/O requests and large data volumes.
  • CouchDB will be flushed after every run.
  • A run may be every 2 days or so.
  • A run is normally 2 hours and rarely 8. It's mainly dictated by a shift schedule with one shift per day. Can't run for safety reasons too long of shifts and don't have the manpower for 24/7 running.
  • Define a data store interface. Initial prototype can be in memory buffer (as at present), then hook in CouchDB.


  • Reducers take JSON documents from the data store and produce them.
  • Want to process all data from start of run or data from past N minutes (e.g. 15). N should be configurable.
  • Reducer can query data store every 30s for changes since last query. Reducers need to incrementally process inputs in light of this. Reducers need to store intermediate state.
  • Reducers can run in parallel e.g. N Python scripts run on N machines or in N threads in a single program. No need to colocate with mappers, or even spawn from same application.
  • A failure of a single Reducer must not bring down any other, or the mappers (graceful failure) but the error should be recorded for notifying the user.


  • Histograms allow detector performance to be monitored.
  • Converts JSON document into JPG or EPS (or other image file).
  • May be 100s of plots per output.
  • Should be easy to configure/extend by non-Python programmer.
  • Initial version can use Python matplotlib. (#702)
  • Another version should use PyROOT. (#707)
  • Future possibilities include gnuplot.
  • Needs to allow aggregation of data to incrementally update the plot (so include a test for this!)

JSON and histogramming

  • Allow cuts e.g. if channel_id.type = "tracker" then print the adc_count for each digit in digits.
  • 1D example with adc_counts for each digit in digits.
  • 2D example with both adc_counts and tdc_counts visualised.
  • Title and axes labels.



import ROOT
h1 = ROOT.TH1F("h1", "title", 100, 0, 10)  # bins, x_min, x_max

for x in range(1,3):
c1 = ROOT.TCanvas()
  • Wrapper to ROOT.
  • PyRoot is more closely coupled to physics model so upgrading PyROOT means upgrading physics model.


  • Store plots output by reducers "somewhere" so they can be rendered.
  • Plots need to be rendered in a web interface so they're visible in the control room and externally.
  • If a reducer fails then this should be visualised too.
  • Data transfer to browser via HTTP POST or whatever is appropriate.
  • Initial version can be directory of files and PHP script/HTML file to list them. (#703)
  • CT can provide access to web server.
  • Use an automatically-updating web interface constructed in Django Python web framework, (#706)
  • Useful for user to be able to tweak time intervals, axes ranges etc.
  • David Colling may be able to provide guidance as he's done similar things before.

Operational dimensions

  • 1 JSON document every second.
  • Map group output document is 10-100K.
  • If reducer processes data from last 15 minutes => 15 (minutes) x 60s (1 doc/sec input) x 100K (size of map output) = 90,000K = 90MB
  • Option to pre-scale and drop events e.g. 1 every 100, if not enough processing power for the above OR to cut reconstruction code but this is highly undesirable.
  • May be 100s of plots per output.
  • View every 30s.
  • A run is normally 2 hours and rarely 8.
  • (8 * 60 * 60) (run in seconds) * 100K (1 doc/sec input) = 2880000K = 2812.5M data worst case.


  • Need to (optionally) bundle CouchDB and Celery with the MAUS download. ROOT is similar.
  • Only test machine and control room need to run these - bundling is a nice-if.
  • Useful for test machine to make deployment easier.

Updated by Jackson, Mike almost 9 years ago ยท 14 revisions