Как мне распараллелить простой цикл в Python?
Это, вероятно, тривиальный вопрос, но как мне распараллелить следующий цикл в python?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
Я знаю, как запустить отдельные потоки в Python, но я не знаю, как "собрать" результаты.
несколько процессов тоже было бы хорошо - все, что проще всего для этого случая. Я использую в настоящее время Linux, но код должен работать на Windows и Mac, а-хорошо.
какой самый простой способ распараллелить этот код?
9 ответов:
использование нескольких потоков на CPython не даст вам лучшей производительности для кода pure-Python из-за глобальной блокировки интерпретатора (GIL). Я предлагаю использовать
multiprocessing
модуль вместо:pool = multiprocessing.Pool(4) out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
обратите внимание, что это не будет работать в интерактивном интерпретаторе.
чтобы избежать обычного FUD вокруг GIL: в любом случае не было бы никакого преимущества в использовании потоков для этого примера. Ты хочу использовать процессы, а не потоки, потому что они избежать целой кучи проблем.
распараллелить простой цикл for, joblib приносит много значения к Сырцовой пользе многопроцессорного. Не только короткий синтаксис, но и такие вещи, как прозрачная группировка итераций, когда они очень быстры (чтобы удалить накладные расходы) или захват трассировки дочернего процесса, чтобы иметь лучшую отчетность об ошибках.
отказ от ответственности: я автор joblib.
какой самый простой способ распараллелить этот код?
мне очень нравится
concurrent.futures
для этого доступно в Python3 начиная с версии 3.2 - и через backport до 2.6 и 2.7 на PyPi.вы можете использовать потоки или процессы и использовать тот же интерфейс.
многопроцессорная обработка
положите это в файл - futuretest.py:
import concurrent.futures import time, random # add some random sleep time offset = 2 # you don't supply these so def calc_stuff(parameter=None): # these are examples. sleep_time = random.choice([0, 1, 2, 3, 4, 5]) time.sleep(sleep_time) return parameter / 2, sleep_time, parameter * parameter def procedure(j): # just factoring out the parameter = j * offset # procedure # call the calculation return calc_stuff(parameter=parameter) def main(): output1 = list() output2 = list() output3 = list() start = time.time() # let's see how long this takes # we can swap out ProcessPoolExecutor for ThreadPoolExecutor with concurrent.futures.ProcessPoolExecutor() as executor: for out1, out2, out3 in executor.map(procedure, range(0, 10)): # put results into correct output list output1.append(out1) output2.append(out2) output3.append(out3) finish = time.time() # these kinds of format strings are only available on Python 3.6: # time to upgrade! print(f'original inputs: {repr(output1)}') print(f'total time to execute {sum(output2)} = sum({repr(output2)})') print(f'time saved by parallelizing: {sum(output2) - (finish-start)}') print(f'returned in order given: {repr(output3)}') if __name__ == '__main__': main()
и вот вывод:
$ python3 -m futuretest original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4]) time saved by parallellizing: 27.68999981880188 returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
многопоточность
изменить сейчас
ProcessPoolExecutor
доThreadPoolExecutor
, и снова запустите модуль:$ python3 -m futuretest original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1]) time saved by parallellizing: 13.992000102996826 returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
теперь вы сделали как многопоточность и многопроцессорность!
обратите внимание на производительность и использование обоих вместе.
выборка слишком мала для сравнения результатов.
однако я подозреваю, что многопоточность будет быстрее, чем многопроцессорная обработка в целом, особенно в Windows, поскольку Windows не работает поддержка разветвления, поэтому каждый новый процесс должен занять время для запуска. На Linux или Mac они, вероятно, будут ближе.
вы можете вложить несколько потоков в несколько процессов, но рекомендуется не использовать несколько потоков для выделения нескольких процессов.
почему вы не используете потоки и один мьютекс для защиты одного глобального списка?
import os import re import time import sys import thread from threading import Thread class thread_it(Thread): def __init__ (self,param): Thread.__init__(self) self.param = param def run(self): mutex.acquire() output.append(calc_stuff(self.param)) mutex.release() threads = [] output = [] mutex = thread.allocate_lock() for j in range(0, 10): current = thread_it(j * offset) threads.append(current) current.start() for t in threads: t.join() #here you have output list filled with data
имейте в виду, вы будете так же быстро, как ваш медленный поток
есть ряд преимуществ в использовании Рэй:
- вы можете распараллеливать несколько машин в дополнение к нескольким ядрам (с тем же кодом).
- эффективная обработка числовых данных через общую память (и сериализация нулевого копирования).
- высокая пропускная способность задачи с распределенным планированием.
- отказоустойчивость.
в вашем случае вы можете запустить Ray и определить пульт дистанционного управления функция
import ray ray.init() @ray.remote(num_return_vals=3) def calc_stuff(parameter=None): # Do something. return 1, 2, 3
а затем вызвать его параллельно
output1, output2, output3 = [], [], [] # Launch the tasks. for j in range(10): id1, id2, id3 = calc_stuff.remote(parameter=j) output1.append(id1) output2.append(id2) output3.append(id3) # Block until the results have finished and get the results. output1 = ray.get(output1) output2 = ray.get(output2) output3 = ray.get(output3)
чтобы запустить тот же пример в кластере, единственной линией, которая изменится, будет вызов ray.инициализация.)( Соответствующую документацию можно найти здесь.
обратите внимание, что я помогаю развивать Рэй.
Это может быть полезно при реализации многопроцессорных и параллельных/ распределенных вычислений в Python.
YouTube учебник по использованию пакета techila
Techila-это промежуточное программное обеспечение для распределенных вычислений, которое интегрируется непосредственно с Python с помощью пакета techila. Функция peach в пакете может быть полезна при распараллеливании петлевых структур. (Следующий фрагмент кода из Форумы Сообщества Techila)
techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers jobs = jobcount # Number of Jobs in the Project )
from joblib import Parallel, delayed import multiprocessing inputs = range(10) def processInput(i): return i * i num_cores = multiprocessing.cpu_count() results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs) print(results)
выше прекрасно работает на моей машине (Ubuntu, пакет joblib был предварительно установлен, но может быть установлен через
pip install joblib
).взято из https://blog.dominodatalab.com/simple-parallelization/
очень простой пример параллельной обработки
from multiprocessing import Process output1 = list() output2 = list() output3 = list() def yourfunction(): for j in range(0, 10): # calc individual parameter value parameter = j * offset # call the calculation out1, out2, out3 = calc_stuff(parameter = parameter) # put results into correct output list output1.append(out1) output2.append(out2) output3.append(out3) if __name__ == '__main__': p = Process(target=pa.yourfunction, args=('bob',)) p.start() p.join()
взгляните на это:
http://docs.python.org/library/queue.html
это может быть не правильный способ сделать это, но я бы сделал что-то вроде;
фактический код;
from multiprocessing import Process, JoinableQueue as Queue class CustomWorker(Process): def __init__(self,workQueue, out1,out2,out3): Process.__init__(self) self.input=workQueue self.out1=out1 self.out2=out2 self.out3=out3 def run(self): while True: try: value = self.input.get() #value modifier temp1,temp2,temp3 = self.calc_stuff(value) self.out1.put(temp1) self.out2.put(temp2) self.out3.put(temp3) self.input.task_done() except Queue.Empty: return #Catch things better here def calc_stuff(self,param): out1 = param * 2 out2 = param * 4 out3 = param * 8 return out1,out2,out3 def Main(): inputQueue = Queue() for i in range(10): inputQueue.put(i) out1 = Queue() out2 = Queue() out3 = Queue() processes = [] for x in range(2): p = CustomWorker(inputQueue,out1,out2,out3) p.daemon = True p.start() processes.append(p) inputQueue.join() while(not out1.empty()): print out1.get() print out2.get() print out3.get() if __name__ == '__main__': Main()
надеюсь, что это поможет.