Feature #408

parallelization using celery project distributed task queue

Added by Tunnell, Christopher almost 13 years ago. Updated about 12 years ago.

Tunnell, Christopher
Online reconstruction
Target version:
Start date:
15 April 2011
Due date:
% Done:


Estimated time:


I closed #156. see celery project. It should be really quick to parallelize MAUS using this.


Updated by Tunnell, Christopher almost 13 years ago

This could be a lot of fun to do if somebody wants to pick it up from me!


Updated by Rogers, Chris over 12 years ago

(Mucking about for some other work) See this...


Updated by Rogers, Chris over 12 years ago

I added Roger as a watcher because I thought it was something he might be interested in...


Updated by Rogers, Chris over 12 years ago

Code snippet for parallelisation using Python multiprocessing - wraps around arbitrary function call to multiprocess it:

def function_with_queue(args):
  Wrapper function to put multiprocessing output into a queue

  @param args tuple of (function_call, queue, function_arg) where\n
      * function_call is a reference to the function that we want to wrap\n
      * queue is the queue that will hold return values from the function\n
      * function_arg is the argument to function_call\n
      * index is an index indicating function_arg's position in the inputs

  tuple of (index, Output) is placed into queue; if function_call throws an
  exception, the exception is placed on the queue instead
  (function_call, queue, function_arg, index) = args
    queue.put((index, function_call(*function_arg)))
    queue.put((index, sys.exc_info()[1]))

def process_list(function_call, list_of_args, max_n_processes):
  Run multiprocessing on a list of arguments

  @param function_call multiprocess this function call
  @param list_of_args list of tuples of arguments for function_call
  @param max_n_processes maximum number of concurrent processes to use

  @returns list of return values, one for each function call. List is always
           sorted into same order as input
  manager = multiprocessing.Manager()
  queue = manager.Queue() # queue stores return values
  pool = multiprocessing.Pool(max_n_processes) # pool runs multiprocess
  # each process needs:
  #    reference to queue to put returns values in
  #    index (to sort return values)
  #    plus function call and argument
  new_list_of_args = [(function_call, queue, x, i) for i,x in enumerate(list_of_args)]
  # run the processes, new_list_of_args)
  # sort output into a list
  out_list = []
  while not queue.empty():
  for i,item in enumerate(out_list):
    out_list[i] = item[1]
  # cleanup
  return out_list

def __test_run(item):
  if item == 'throw':
    raise RuntimeError('Throwing item '+str(item))
  time.sleep(random.random()) # returns to queue in random order
  return 'output',item

def test_function_with_queue():
  queue = multiprocessing.Queue()
  function_with_queue((__test_run, queue, ("input",), 5))
  assert queue.get() == (5, ("output","input"))
  function_with_queue((__test_run, queue, ("throw",), 7))
  out = queue.get()
  assert out[0] == 7
    assert False # should raise an exception in parent process

def test_process_list():
  process_out = process_list(__test_run, [(i,) for i in range(10)], 3)
  for i in range(10):
    assert process_out[i] == ('output',i) # check sorting


Updated by Tunnell, Christopher over 12 years ago

how does birth fit in?

with celerty project... which I got most of the way with then got constnatly distracted... you can define imports you want any thread/machine to do.


Updated by Tunnell, Christopher over 12 years ago

  • Priority changed from Normal to Low

Updated by Rogers, Chris over 12 years ago

  • Category changed from common_py to Online reconstruction

Updated by Rogers, Chris over 12 years ago

Presumably birth() is as per single threaded. multiprocessing makes a new process copy of current process and executes in that. So birth() as normal, presumably death() has to be run in each item of the pool. I hadn't thought about that, I confess. But as we have no internal state, what can death() do anyway?


Updated by Tunnell, Christopher about 12 years ago

  • Status changed from Open to Rejected


Also available in: Atom PDF