Feature #408
parallelization using celery project distributed task queue
0%
Description
I closed #156. see celery project. It should be really quick to parallelize MAUS using this.
Updated by Tunnell, Christopher about 12 years ago
This could be a lot of fun to do if somebody wants to pick it up from me!
Updated by Rogers, Chris almost 12 years ago
(Mucking about for some other work) See this...
Updated by Rogers, Chris almost 12 years ago
I added Roger as a watcher because I thought it was something he might be interested in...
Updated by Rogers, Chris almost 12 years ago
Code snippet for parallelisation using Python multiprocessing - wraps around arbitrary function call to multiprocess it:
def function_with_queue(args): """ Wrapper function to put multiprocessing output into a queue @param args tuple of (function_call, queue, function_arg) where\n * function_call is a reference to the function that we want to wrap\n * queue is the queue that will hold return values from the function\n * function_arg is the argument to function_call\n * index is an index indicating function_arg's position in the inputs tuple of (index, Output) is placed into queue; if function_call throws an exception, the exception is placed on the queue instead """ (function_call, queue, function_arg, index) = args try: queue.put((index, function_call(*function_arg))) except: queue.put((index, sys.exc_info()[1])) def process_list(function_call, list_of_args, max_n_processes): """ Run multiprocessing on a list of arguments @param function_call multiprocess this function call @param list_of_args list of tuples of arguments for function_call @param max_n_processes maximum number of concurrent processes to use @returns list of return values, one for each function call. List is always sorted into same order as input """ manager = multiprocessing.Manager() queue = manager.Queue() # queue stores return values pool = multiprocessing.Pool(max_n_processes) # pool runs multiprocess # each process needs: # reference to queue to put returns values in # index (to sort return values) # plus function call and argument new_list_of_args = [(function_call, queue, x, i) for i,x in enumerate(list_of_args)] # run the processes pool.map(function_with_queue, new_list_of_args) # sort output into a list out_list = [] while not queue.empty(): out_list.append(queue.get()) out_list.sort() for i,item in enumerate(out_list): out_list[i] = item[1] # cleanup pool.close() return out_list def __test_run(item): if item == 'throw': raise RuntimeError('Throwing item '+str(item)) time.sleep(random.random()) # returns to queue in random order return 'output',item def test_function_with_queue(): queue = multiprocessing.Queue() function_with_queue((__test_run, queue, ("input",), 5)) assert queue.get() == (5, ("output","input")) function_with_queue((__test_run, queue, ("throw",), 7)) out = queue.get() assert out[0] == 7 try: function_with_queue(('no_args',)) assert False # should raise an exception in parent process except: pass def test_process_list(): process_out = process_list(__test_run, [(i,) for i in range(10)], 3) for i in range(10): assert process_out[i] == ('output',i) # check sorting test_function_with_queue() test_process_list()
Updated by Tunnell, Christopher almost 12 years ago
how does birth fit in?
with celerty project... which I got most of the way with then got constnatly distracted... you can define imports you want any thread/machine to do.
Updated by Rogers, Chris almost 12 years ago
- Category changed from common_py to Online reconstruction
Updated by Rogers, Chris over 11 years ago
Presumably birth() is as per single threaded. multiprocessing makes a new process copy of current process and executes in that. So birth() as normal, presumably death() has to be run in each item of the pool. I hadn't thought about that, I confess. But as we have no internal state, what can death() do anyway?