Питон многопроцессорной ошибка мариновать


Мне жаль, что я не могу воспроизвести ошибку с более простым примером, и мой код слишком сложен для публикации. Если я запускаю программу в оболочке IPython вместо обычного python, все работает хорошо.

Я посмотрел некоторые предыдущие заметки по этой проблеме. Все они были вызваны использованием пула для вызова функции, определенной в функции класса. Но это не относится ко мне.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Я был бы признателен за любую помощь.

обновление: функция I рассол определяется на верхнем уровне модуля. Хотя он вызывает функцию, которая содержит вложенную функцию. т. е. F() вызывает g() вызывает h(), который имеет вложенную функцию i (), и я вызываю пул.apply_async (f). f(), g (), h () все определены на верхнем уровне. Я попробовал более простой пример с этим шаблоном, и он работает, хотя.

7 156

7 ответов:

здесь список того, что можно мариновать. В частности, функции можно выбрать только в том случае, если они определены на верхнем уровне модуля.

этот кусок кода:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()

дает ошибку, почти идентичную той, которую вы опубликовали:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

проблема в том, что pool методы Все использовать queue.Queue для передачи задач рабочим процессам. Все, что проходит через queue.Queue должно быть pickable, и foo.work не picklable, так как он не определен на верхнем уровне модуля.

это можно исправить, определив функцию на верхнем уровне, которая вызывает foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

обратите внимание, что foo можно выбрать, так как Foo определен на верхнем уровне и foo.__dict__ маринуется.

Я хотел бы использовать pathos.multiprocesssing, вместо multiprocessing. pathos.multiprocessing - это форк multiprocessing использует dill. dill может сериализовать почти все, что в python, так что вы можете отправить гораздо больше вокруг параллельно. Элемент pathos fork также имеет возможность работать непосредственно с несколькими функциями аргументов, как вам нужно для методов класса.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Get pathos (и если хотите, dill) вот: https://github.com/uqfoundation

как уже сказал multiprocessing может передавать только объекты Python рабочим процессам, которые могут быть замаринованы. Если вы не можете реорганизовать свой код, как описано unutbu, вы можете использовать dillрасширенные возможности травления/распаковки для передачи данных (особенно данных кода), как я показываю ниже.

это решение требует только установки dill и никаких других библиотек как pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

Я обнаружил, что я также могу генерировать именно этот вывод ошибок на отлично работающем фрагменте кода, пытаясь использовать профилировщик на нем.

обратите внимание, что это было на Windows (где разветвление немного менее элегантно).

Я бежал:

python -m profile -o output.pstats <script> 

и обнаружил, что удаление профилирования удалило ошибку и размещение профилирования восстановило ее. Это тоже сводило меня с ума, потому что я знал, что код работает. Я проверяю, чтобы увидеть, если что-то было обновлено pool.py... потом у меня возникло предчувствие провала и я устранил профилирование, и все.

размещение здесь для архивов на случай, если кто-то еще столкнется с этим.

это решение требует только установки укропа и никаких других библиотек в качестве пафоса

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

Он также работает для массивов numpy.

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

эта ошибка также придет, если у вас есть встроенная функция внутри объекта модели, который был передан в асинхронной работы.

так что не забудьте проверить модель, которые передаются не имеют встроенных функций. (В нашем случае мы использовали FieldTracker() функции django-model-utils внутри модели для отслеживания определенной области). Вот это ссылке к соответствующему вопросу GitHub.

вы случайно не передаете массив строк numpy?

У меня была такая же точная ошибка, когда я передаю массив, который содержит пустую строку. Я думаю, что это может быть из-за этой ошибки: http://projects.scipy.org/numpy/ticket/1658