Project

General

Profile

Feature #408

parallelization using celery project distributed task queue

Added by Tunnell, Christopher over 10 years ago. Updated almost 10 years ago.

Status:
Rejected
Priority:
Low
Assignee:
Tunnell, Christopher
Category:
Online reconstruction
Target version:
Start date:
15 April 2011
Due date:
% Done:

0%

Estimated time:
Workflow:

Description

I closed #156. see celery project. It should be really quick to parallelize MAUS using this.

#1

Updated by Tunnell, Christopher over 10 years ago

This could be a lot of fun to do if somebody wants to pick it up from me!

#2

Updated by Rogers, Chris about 10 years ago

(Mucking about for some other work) See this...

http://wiki.python.org/moin/ParallelProcessing

#3

Updated by Rogers, Chris about 10 years ago

I added Roger as a watcher because I thought it was something he might be interested in...

#4

Updated by Rogers, Chris about 10 years ago

Code snippet for parallelisation using Python multiprocessing - wraps around arbitrary function call to multiprocess it:

def function_with_queue(args):
  """ 
  Wrapper function to put multiprocessing output into a queue

  @param args tuple of (function_call, queue, function_arg) where\n
      * function_call is a reference to the function that we want to wrap\n
      * queue is the queue that will hold return values from the function\n
      * function_arg is the argument to function_call\n
      * index is an index indicating function_arg's position in the inputs

  tuple of (index, Output) is placed into queue; if function_call throws an
  exception, the exception is placed on the queue instead
  """ 
  (function_call, queue, function_arg, index) = args
  try:
    queue.put((index, function_call(*function_arg)))
  except:
    queue.put((index, sys.exc_info()[1]))

def process_list(function_call, list_of_args, max_n_processes):
  """ 
  Run multiprocessing on a list of arguments

  @param function_call multiprocess this function call
  @param list_of_args list of tuples of arguments for function_call
  @param max_n_processes maximum number of concurrent processes to use

  @returns list of return values, one for each function call. List is always
           sorted into same order as input
  """ 
  manager = multiprocessing.Manager()
  queue = manager.Queue() # queue stores return values
  pool = multiprocessing.Pool(max_n_processes) # pool runs multiprocess
  # each process needs:
  #    reference to queue to put returns values in
  #    index (to sort return values)
  #    plus function call and argument
  new_list_of_args = [(function_call, queue, x, i) for i,x in enumerate(list_of_args)]
  # run the processes
  pool.map(function_with_queue, new_list_of_args)
  # sort output into a list
  out_list = []
  while not queue.empty():
    out_list.append(queue.get())
  out_list.sort()
  for i,item in enumerate(out_list):
    out_list[i] = item[1]
  # cleanup
  pool.close()
  return out_list

def __test_run(item):
  if item == 'throw':
    raise RuntimeError('Throwing item '+str(item))
  time.sleep(random.random()) # returns to queue in random order
  return 'output',item

def test_function_with_queue():
  queue = multiprocessing.Queue()
  function_with_queue((__test_run, queue, ("input",), 5))
  assert queue.get() == (5, ("output","input"))
  function_with_queue((__test_run, queue, ("throw",), 7))
  out = queue.get()
  assert out[0] == 7
  try:
    function_with_queue(('no_args',))
    assert False # should raise an exception in parent process
  except:
    pass

def test_process_list():
  process_out = process_list(__test_run, [(i,) for i in range(10)], 3)
  for i in range(10):
    assert process_out[i] == ('output',i) # check sorting

test_function_with_queue()
test_process_list()
#5

Updated by Tunnell, Christopher about 10 years ago

how does birth fit in?

with celerty project... which I got most of the way with then got constnatly distracted... you can define imports you want any thread/machine to do.

#6

Updated by Tunnell, Christopher about 10 years ago

  • Priority changed from Normal to Low
#7

Updated by Rogers, Chris about 10 years ago

  • Category changed from common_py to Online reconstruction
#8

Updated by Rogers, Chris almost 10 years ago

Presumably birth() is as per single threaded. multiprocessing makes a new process copy of current process and executes in that. So birth() as normal, presumably death() has to be run in each item of the pool. I hadn't thought about that, I confess. But as we have no internal state, what can death() do anyway?

#9

Updated by Tunnell, Christopher almost 10 years ago

  • Status changed from Open to Rejected

dupe

Also available in: Atom PDF