Питон многопроцессорной ошибка мариновать
Мне жаль, что я не могу воспроизвести ошибку с более простым примером, и мой код слишком сложен для публикации. Если я запускаю программу в оболочке IPython вместо обычного python, все работает хорошо.
Я посмотрел некоторые предыдущие заметки по этой проблеме. Все они были вызваны использованием пула для вызова функции, определенной в функции класса. Но это не относится ко мне.
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Я был бы признателен за любую помощь.
обновление: функция I рассол определяется на верхнем уровне модуля. Хотя он вызывает функцию, которая содержит вложенную функцию. т. е. F() вызывает g() вызывает h(), который имеет вложенную функцию i (), и я вызываю пул.apply_async (f). f(), g (), h () все определены на верхнем уровне. Я попробовал более простой пример с этим шаблоном, и он работает, хотя.
7 ответов:
здесь список того, что можно мариновать. В частности, функции можно выбрать только в том случае, если они определены на верхнем уровне модуля.
этот кусок кода:
import multiprocessing as mp class Foo(): @staticmethod def work(self): pass pool = mp.Pool() foo = Foo() pool.apply_async(foo.work) pool.close() pool.join()
дает ошибку, почти идентичную той, которую вы опубликовали:
Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 505, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
проблема в том, что
pool
методы Все использоватьqueue.Queue
для передачи задач рабочим процессам. Все, что проходит черезqueue.Queue
должно быть pickable, иfoo.work
не picklable, так как он не определен на верхнем уровне модуля.это можно исправить, определив функцию на верхнем уровне, которая вызывает
foo.work()
:def work(foo): foo.work() pool.apply_async(work,args=(foo,))
обратите внимание, что
foo
можно выбрать, так какFoo
определен на верхнем уровне иfoo.__dict__
маринуется.
Я хотел бы использовать
pathos.multiprocesssing
, вместоmultiprocessing
.pathos.multiprocessing
- это форкmultiprocessing
используетdill
.dill
может сериализовать почти все, что в python, так что вы можете отправить гораздо больше вокруг параллельно. Элементpathos
fork также имеет возможность работать непосредственно с несколькими функциями аргументов, как вам нужно для методов класса.>>> from pathos.multiprocessing import ProcessingPool as Pool >>> p = Pool(4) >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> p.map(t.plus, x, y) [4, 6, 8, 10] >>> >>> class Foo(object): ... @staticmethod ... def work(self, x): ... return x+1 ... >>> f = Foo() >>> p.apipe(f.work, f, 100) <processing.pool.ApplyResult object at 0x10504f8d0> >>> res = _ >>> res.get() 101
Get
pathos
(и если хотите,dill
) вот: https://github.com/uqfoundation
как уже сказал
multiprocessing
может передавать только объекты Python рабочим процессам, которые могут быть замаринованы. Если вы не можете реорганизовать свой код, как описано unutbu, вы можете использоватьdill
расширенные возможности травления/распаковки для передачи данных (особенно данных кода), как я показываю ниже.это решение требует только установки
dill
и никаких других библиотек какpathos
:import os from multiprocessing import Pool import dill def run_dill_encoded(payload): fun, args = dill.loads(payload) return fun(*args) def apply_async(pool, fun, args): payload = dill.dumps((fun, args)) return pool.apply_async(run_dill_encoded, (payload,)) if __name__ == "__main__": pool = Pool(processes=5) # asyn execution of lambda jobs = [] for i in range(10): job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1)) jobs.append(job) for job in jobs: print job.get() print # async execution of static method class O(object): @staticmethod def calc(): return os.getpid() jobs = [] for i in range(10): job = apply_async(pool, O.calc, ()) jobs.append(job) for job in jobs: print job.get()
Я обнаружил, что я также могу генерировать именно этот вывод ошибок на отлично работающем фрагменте кода, пытаясь использовать профилировщик на нем.
обратите внимание, что это было на Windows (где разветвление немного менее элегантно).
Я бежал:
python -m profile -o output.pstats <script>
и обнаружил, что удаление профилирования удалило ошибку и размещение профилирования восстановило ее. Это тоже сводило меня с ума, потому что я знал, что код работает. Я проверяю, чтобы увидеть, если что-то было обновлено pool.py... потом у меня возникло предчувствие провала и я устранил профилирование, и все.
размещение здесь для архивов на случай, если кто-то еще столкнется с этим.
это решение требует только установки укропа и никаких других библиотек в качестве пафоса
def apply_packed_function_for_map((dumped_function, item, args, kwargs),): """ Unpack dumped function as target function and call it with arguments. :param (dumped_function, item, args, kwargs): a tuple of dumped function and its arguments :return: result of target function """ target_function = dill.loads(dumped_function) res = target_function(item, *args, **kwargs) return res def pack_function_for_map(target_function, items, *args, **kwargs): """ Pack function and arguments to object that can be sent from one multiprocessing.Process to another. The main problem is: «multiprocessing.Pool.map*» or «apply*» cannot use class methods or closures. It solves this problem with «dill». It works with target function as argument, dumps it («with dill») and returns dumped function with arguments of target function. For more performance we dump only target function itself and don't dump its arguments. How to use (pseudo-code): ~>>> import multiprocessing ~>>> images = [...] ~>>> pool = multiprocessing.Pool(100500) ~>>> features = pool.map( ~... *pack_function_for_map( ~... super(Extractor, self).extract_features, ~... images, ~... type='png' ~... **options, ~... ) ~... ) ~>>> :param target_function: function, that you want to execute like target_function(item, *args, **kwargs). :param items: list of items for map :param args: positional arguments for target_function(item, *args, **kwargs) :param kwargs: named arguments for target_function(item, *args, **kwargs) :return: tuple(function_wrapper, dumped_items) It returs a tuple with * function wrapper, that unpack and call target function; * list of packed target function and its' arguments. """ dumped_function = dill.dumps(target_function) dumped_items = [(dumped_function, item, args, kwargs) for item in items] return apply_packed_function_for_map, dumped_items
Он также работает для массивов numpy.
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
эта ошибка также придет, если у вас есть встроенная функция внутри объекта модели, который был передан в асинхронной работы.
так что не забудьте проверить модель, которые передаются не имеют встроенных функций. (В нашем случае мы использовали
FieldTracker()
функции django-model-utils внутри модели для отслеживания определенной области). Вот это ссылке к соответствующему вопросу GitHub.
вы случайно не передаете массив строк numpy?
У меня была такая же точная ошибка, когда я передаю массив, который содержит пустую строку. Я думаю, что это может быть из-за этой ошибки: http://projects.scipy.org/numpy/ticket/1658