parallelization using celery project distributed task queue
I closed #156. see celery project. It should be really quick to parallelize MAUS using this.
Updated by Rogers, Chris about 11 years ago
Code snippet for parallelisation using Python multiprocessing - wraps around arbitrary function call to multiprocess it:
def function_with_queue(args): """ Wrapper function to put multiprocessing output into a queue @param args tuple of (function_call, queue, function_arg) where\n * function_call is a reference to the function that we want to wrap\n * queue is the queue that will hold return values from the function\n * function_arg is the argument to function_call\n * index is an index indicating function_arg's position in the inputs tuple of (index, Output) is placed into queue; if function_call throws an exception, the exception is placed on the queue instead """ (function_call, queue, function_arg, index) = args try: queue.put((index, function_call(*function_arg))) except: queue.put((index, sys.exc_info())) def process_list(function_call, list_of_args, max_n_processes): """ Run multiprocessing on a list of arguments @param function_call multiprocess this function call @param list_of_args list of tuples of arguments for function_call @param max_n_processes maximum number of concurrent processes to use @returns list of return values, one for each function call. List is always sorted into same order as input """ manager = multiprocessing.Manager() queue = manager.Queue() # queue stores return values pool = multiprocessing.Pool(max_n_processes) # pool runs multiprocess # each process needs: # reference to queue to put returns values in # index (to sort return values) # plus function call and argument new_list_of_args = [(function_call, queue, x, i) for i,x in enumerate(list_of_args)] # run the processes pool.map(function_with_queue, new_list_of_args) # sort output into a list out_list =  while not queue.empty(): out_list.append(queue.get()) out_list.sort() for i,item in enumerate(out_list): out_list[i] = item # cleanup pool.close() return out_list def __test_run(item): if item == 'throw': raise RuntimeError('Throwing item '+str(item)) time.sleep(random.random()) # returns to queue in random order return 'output',item def test_function_with_queue(): queue = multiprocessing.Queue() function_with_queue((__test_run, queue, ("input",), 5)) assert queue.get() == (5, ("output","input")) function_with_queue((__test_run, queue, ("throw",), 7)) out = queue.get() assert out == 7 try: function_with_queue(('no_args',)) assert False # should raise an exception in parent process except: pass def test_process_list(): process_out = process_list(__test_run, [(i,) for i in range(10)], 3) for i in range(10): assert process_out[i] == ('output',i) # check sorting test_function_with_queue() test_process_list()
Updated by Rogers, Chris almost 11 years ago
Presumably birth() is as per single threaded. multiprocessing makes a new process copy of current process and executes in that. So birth() as normal, presumably death() has to be run in each item of the pool. I hadn't thought about that, I confess. But as we have no internal state, what can death() do anyway?