Объекты общей памяти в многопроцессорной обработке


Предположим, у меня есть большой массив numpy в памяти, у меня есть функция func в этот гигантский массив в качестве входных данных (вместе с некоторыми другими параметрами). func С различными параметрами можно побежать параллельно. Например:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

если я использую многопроцессорную библиотеку, то этот гигантский массив будет скопирован несколько раз в разные процессы.

есть ли способ позволить различным процессам совместно использовать один и тот же массив? Этот объект массива доступен только для чтения и никогда не будет изменен.

что сложнее, если arr-это не массив, а произвольный объект python, есть ли способ поделиться им?

[отредактировано]

Я прочитал ответ, но я все еще немного смущен. Поскольку fork () является copy-on-write, мы не должны вызывать никаких дополнительных затрат при создании новых процессов в многопроцессорной библиотеке python. Но следующий код предполагает, что есть огромные накладные расходы:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

выход (и кстати, стоимость возрастает по мере увеличения размера массива, поэтому я подозреваю, что есть еще накладные расходы, связанные с копированием памяти):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

почему такие огромные накладные расходы, если мы не копировали массив? И какая часть общей памяти спасает меня?

2 82

2 ответа:

если вы используете операционную систему, которая использует copy-on-write fork() семантика (как и любой обычный unix), то до тех пор, пока вы никогда не измените структуру данных, она будет доступна для всех дочерних процессов, не занимая дополнительную память. Вам не придется делать ничего особенного (за исключением того, что вы абсолютно уверены, что не изменяете объект).

самая эффективная вещь вы может сделать для вашей проблемы было бы упаковать ваш массив в эффективный массив структура (с помощью numpy или array), поместите это в общую память, оберните его с multiprocessing.Array, и передайте это своим функциям. этот ответ показывает, как это сделать.

если вы хотите writeable общий объект, то вам нужно обернуть его с какой-то синхронизации или замок. multiprocessing предоставляет два способа сделать это: один с использованием общей памяти (подходит для простых значений, массивов или типов ctypes) или Manager прокси-сервер, где один процесс удерживает память, а менеджер определяет доступ к ней из других процессов (даже по сети).

The Manager подход может быть использован с произвольными объектами Python, но будет медленнее, чем эквивалент с использованием общей памяти, потому что объекты должны быть сериализованы/десериализованы и отправлены между процессами.

здесь множество библиотек параллельной обработки и подходов, доступных в Python. multiprocessing отлично и хорошо округленная библиотека, но если у вас есть особые потребности, возможно, один из других подходов может быть лучше.

я столкнулся с той же проблемой и написал небольшой класс утилиты с общей памятью, чтобы обойти ее.

Я использую многопроцессорной обработки.RawArray (lockfree), а также доступ к массивам не синхронизирован вообще (lockfree), будьте осторожны, чтобы не стрелять своими ногами.

с решением я получаю ускорение примерно в 3 раза на четырехъядерном i7.

вот код: Не стесняйтесь использовать и улучшать его, и, пожалуйста, сообщите о любых ошибках.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))