Как мне распараллелить простой цикл в Python?


Это, вероятно, тривиальный вопрос, но как мне распараллелить следующий цикл в python?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Я знаю, как запустить отдельные потоки в Python, но я не знаю, как "собрать" результаты.

несколько процессов тоже было бы хорошо - все, что проще всего для этого случая. Я использую в настоящее время Linux, но код должен работать на Windows и Mac, а-хорошо.

какой самый простой способ распараллелить этот код?

9 138

9 ответов:

использование нескольких потоков на CPython не даст вам лучшей производительности для кода pure-Python из-за глобальной блокировки интерпретатора (GIL). Я предлагаю использовать multiprocessing модуль вместо:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

обратите внимание, что это не будет работать в интерактивном интерпретаторе.

чтобы избежать обычного FUD вокруг GIL: в любом случае не было бы никакого преимущества в использовании потоков для этого примера. Ты хочу использовать процессы, а не потоки, потому что они избежать целой кучи проблем.

распараллелить простой цикл for, joblib приносит много значения к Сырцовой пользе многопроцессорного. Не только короткий синтаксис, но и такие вещи, как прозрачная группировка итераций, когда они очень быстры (чтобы удалить накладные расходы) или захват трассировки дочернего процесса, чтобы иметь лучшую отчетность об ошибках.

отказ от ответственности: я автор joblib.

какой самый простой способ распараллелить этот код?

мне очень нравится concurrent.futures для этого доступно в Python3 начиная с версии 3.2 - и через backport до 2.6 и 2.7 на PyPi.

вы можете использовать потоки или процессы и использовать тот же интерфейс.

многопроцессорная обработка

положите это в файл - futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

и вот вывод:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

многопоточность

изменить сейчас ProcessPoolExecutor до ThreadPoolExecutor, и снова запустите модуль:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

теперь вы сделали как многопоточность и многопроцессорность!

обратите внимание на производительность и использование обоих вместе.

выборка слишком мала для сравнения результатов.

однако я подозреваю, что многопоточность будет быстрее, чем многопроцессорная обработка в целом, особенно в Windows, поскольку Windows не работает поддержка разветвления, поэтому каждый новый процесс должен занять время для запуска. На Linux или Mac они, вероятно, будут ближе.

вы можете вложить несколько потоков в несколько процессов, но рекомендуется не использовать несколько потоков для выделения нескольких процессов.

почему вы не используете потоки и один мьютекс для защиты одного глобального списка?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

имейте в виду, вы будете так же быстро, как ваш медленный поток

есть ряд преимуществ в использовании Рэй:

  • вы можете распараллеливать несколько машин в дополнение к нескольким ядрам (с тем же кодом).
  • эффективная обработка числовых данных через общую память (и сериализация нулевого копирования).
  • высокая пропускная способность задачи с распределенным планированием.
  • отказоустойчивость.

в вашем случае вы можете запустить Ray и определить пульт дистанционного управления функция

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

а затем вызвать его параллельно

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

чтобы запустить тот же пример в кластере, единственной линией, которая изменится, будет вызов ray.инициализация.)( Соответствующую документацию можно найти здесь.

обратите внимание, что я помогаю развивать Рэй.

Это может быть полезно при реализации многопроцессорных и параллельных/ распределенных вычислений в Python.

YouTube учебник по использованию пакета techila

Techila-это промежуточное программное обеспечение для распределенных вычислений, которое интегрируется непосредственно с Python с помощью пакета techila. Функция peach в пакете может быть полезна при распараллеливании петлевых структур. (Следующий фрагмент кода из Форумы Сообщества Techila)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )
from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

выше прекрасно работает на моей машине (Ubuntu, пакет joblib был предварительно установлен, но может быть установлен через pip install joblib).

взято из https://blog.dominodatalab.com/simple-parallelization/

очень простой пример параллельной обработки

from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()

def yourfunction():

    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter = parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()

взгляните на это:

http://docs.python.org/library/queue.html

это может быть не правильный способ сделать это, но я бы сделал что-то вроде;

фактический код;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

надеюсь, что это поможет.