Исключение, вызванное в многопроцессорном пуле, не обнаружено
Кажется, что при возникновении исключения из многопроцессорной обработки.Процесс пула, нет трассировки стека или каких-либо других признаков того, что он не удался. Пример:
from multiprocessing import Pool
def go():
print(1)
raise Exception()
print(2)
p = Pool()
p.apply_async(go)
p.close()
p.join()
выводит 1 и молча останавливается. Интересно, что вместо этого работает исключение BaseException. Есть ли способ сделать поведение для всех исключений таким же, как BaseException?
9 ответов:
у меня есть разумное решение проблемы, по крайней мере для целей отладки. В настоящее время у меня нет решения, которое вызовет исключение обратно в основные процессы. Моя первая мысль была использовать декоратор, но вы можете только мариновать функции, определенные на верхнем уровне модуля, так что это прямо.
вместо этого простой класс обертывания и подкласс Пула, который использует это для
apply_async
(и, следовательно,apply
). Я уйдуmap_async
как упражнение для читатель.import traceback from multiprocessing.pool import Pool import multiprocessing # Shortcut to multiprocessing's logger def error(msg, *args): return multiprocessing.get_logger().error(msg, *args) class LogExceptions(object): def __init__(self, callable): self.__callable = callable def __call__(self, *args, **kwargs): try: result = self.__callable(*args, **kwargs) except Exception as e: # Here we add some debugging help. If multiprocessing's # debugging is on, it will arrange to log the traceback error(traceback.format_exc()) # Re-raise the original exception so the Pool worker can # clean up raise # It was fine, give a normal answer return result class LoggingPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): return Pool.apply_async(self, LogExceptions(func), args, kwds, callback) def go(): print(1) raise Exception() print(2) multiprocessing.log_to_stderr() p = LoggingPool(processes=1) p.apply_async(go) p.close() p.join()
это дает мне:
1 [ERROR/PoolWorker-1] Traceback (most recent call last): File "mpdebug.py", line 24, in __call__ result = self.__callable(*args, **kwargs) File "mpdebug.py", line 44, in go raise Exception() Exception
может быть, я что-то упускаю, но разве это не то, что
get
метод результата возвращает объект? Смотрите Процесс, Бассейны.класс многопроцессорной обработки.бассейн.Asyncresult, к
класс результата, возвращаемого пулом.apply_async() и бассейн.map_async ().get ([timeout])
Верните результат, когда он прибудет. Если тайм-аут не равен None и результат не поступает в пределах тайм-аут секунд, то многопроцессорная обработка.TimeoutError поднят. Если дистанционное управление вызов вызвал исключение, то это исключение будет повторно вызвано get ().Итак, слегка изменив свой пример, можно сделать
from multiprocessing import Pool def go(): print(1) raise Exception("foobar") print(2) p = Pool() x = p.apply_async(go) x.get() p.close() p.join()
что дает в результате
1 Traceback (most recent call last): File "rob.py", line 10, in <module> x.get() File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get raise self._value Exception: foobar
Это не совсем удовлетворительно, так как он не печатает трассировку, но лучше, чем ничего.
обновление: эта ошибка была исправлена в Python 3.4, любезно Ричард Аудкерк. Смотрите в выпуске получить метод многопроцессорной обработки.бассейн.Асинхронный должен вернуть полную трассировку.
решение с наибольшим количеством голосов на момент написания имеет проблему:
from multiprocessing import Pool def go(): print(1) raise Exception("foobar") print(2) p = Pool() x = p.apply_async(go) x.get() ## waiting here for go() to complete... p.close() p.join()
как отметил @dfrankow, он будет ждать
x.get()
, что разрушает точку запуска задачи асинхронно. Итак, для большей эффективности (в частности, если ваша рабочая функцияgo
занимает много времени) я бы изменил его на:from multiprocessing import Pool def go(x): print(1) # task_that_takes_a_long_time() raise Exception("Can't go anywhere.") print(2) return x**2 p = Pool() results = [] for x in range(1000): results.append( p.apply_async(go, [x]) ) p.close() for r in results: r.get()
преимущества: рабочая функция выполняется асинхронно, поэтому если, например, вы выполняете много задач на нескольких ядрах, это будет намного эффективнее, чем оригинальное решение.
недостатки: если в рабочей функции есть исключение, оно будет вызвано только после пул выполнил все задачи. Это может быть или не быть желательным поведением.отредактировано в соответствии с комментарием @colinfang, который исправил это.
Я имел успех регистрации исключений с этим декоратором:
import traceback, functools, multiprocessing def trace_unhandled_exceptions(func): @functools.wraps(func) def wrapped_func(*args, **kwargs): try: func(*args, **kwargs) except: print 'Exception in '+func.__name__ traceback.print_exc() return wrapped_func
С кодом в вопросе, это
@trace_unhandled_exceptions def go(): print(1) raise Exception() print(2) p = multiprocessing.Pool(1) p.apply_async(go) p.close() p.join()
просто украсьте функцию, которую вы передаете в свой пул процессов. Ключ к этой работе -
@functools.wraps(func)
в противном случае многопроцессорных бросает!--4-->.код выше дает
1 Exception in go Traceback (most recent call last): File "<stdin>", line 5, in wrapped_func File "<stdin>", line 4, in go Exception
import logging from multiprocessing import Pool def proc_wrapper(func, *args, **kwargs): """Print exception because multiprocessing lib doesn't return them right.""" try: return func(*args, **kwargs) except Exception as e: logging.exception(e) raise def go(x): print x raise Exception("foobar") p = Pool() p.apply_async(proc_wrapper, (go, 5)) p.join() p.close()
Я создал модуль RemoteException.py это показывает полную трассировку исключения в процессе. Вместо python2. скачать и добавьте это в свой код:
import RemoteException @RemoteException.showError def go(): raise Exception('Error!') if __name__ == '__main__': import multiprocessing p = multiprocessing.Pool(processes = 1) r = p.apply(go) # full traceback is shown here
Я бы попробовал использовать pdb:
import pdb import sys def handler(type, value, tb): pdb.pm() sys.excepthook = handler
Так как вы использовали
apply_sync
, Я думаю, что вариант использования-это сделать некоторые задачи синхронизации. Использование обратного вызова для обработки является еще одним вариантом. Обратите внимание, что эта опция доступна только для python3.2 и выше и недоступна на python2.7.from multiprocessing import Pool def callback(result): print('success', result) def callback_error(result): print('error', result) def go(): print(1) raise Exception() print(2) p = Pool() p.apply_async(go, callback=callback, error_callback=callback_error) # You can do another things p.close() p.join()
так как уже есть достойные ответы для
multiprocessing.Pool
доступно, я предоставлю решение, используя другой подход для полноты.на
python >= 3.2
следующее решение кажется самым простым:from concurrent.futures import ProcessPoolExecutor, wait def go(): print(1) raise Exception() print(2) futures = [] with ProcessPoolExecutor() as p: for i in range(10): futures.append(p.submit(go)) results = [f.result() for f in futures]
плюсы:
- очень мало кода
- вызывает исключение в основном технологическом процессе
- обеспечивает трассировку стека
- нет внешних зависимостей
для получения дополнительной информации о API пожалуйста, проверьте: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
кроме того, если вы отправляете большое количество задач и вы хотите, чтобы ваш основной процесс на неудачу, как только одна из ваших задач не удается, вы можете использовать следующий фрагмент:
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed import time def go(): print(1) time.sleep(0.3) raise Exception() print(2) futures = [] with ProcessPoolExecutor(1) as p: for i in range(10): futures.append(p.submit(go)) for f in as_completed(futures): if f.exception() is not None: for f in futures: f.cancel() break [f.result() for f in futures]
все остальные ответы не выполняются только после выполнения всех задач.