Исключение, вызванное в многопроцессорном пуле, не обнаружено


Кажется, что при возникновении исключения из многопроцессорной обработки.Процесс пула, нет трассировки стека или каких-либо других признаков того, что он не удался. Пример:

from multiprocessing import Pool 

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go)
p.close()
p.join()

выводит 1 и молча останавливается. Интересно, что вместо этого работает исключение BaseException. Есть ли способ сделать поведение для всех исключений таким же, как BaseException?

9 58

9 ответов:

у меня есть разумное решение проблемы, по крайней мере для целей отладки. В настоящее время у меня нет решения, которое вызовет исключение обратно в основные процессы. Моя первая мысль была использовать декоратор, но вы можете только мариновать функции, определенные на верхнем уровне модуля, так что это прямо.

вместо этого простой класс обертывания и подкласс Пула, который использует это для apply_async (и, следовательно,apply). Я уйду map_async как упражнение для читатель.

import traceback
from multiprocessing.pool import Pool
import multiprocessing

# Shortcut to multiprocessing's logger
def error(msg, *args):
    return multiprocessing.get_logger().error(msg, *args)

class LogExceptions(object):
    def __init__(self, callable):
        self.__callable = callable

    def __call__(self, *args, **kwargs):
        try:
            result = self.__callable(*args, **kwargs)

        except Exception as e:
            # Here we add some debugging help. If multiprocessing's
            # debugging is on, it will arrange to log the traceback
            error(traceback.format_exc())
            # Re-raise the original exception so the Pool worker can
            # clean up
            raise

        # It was fine, give a normal answer
        return result

class LoggingPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)

def go():
    print(1)
    raise Exception()
    print(2)

multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)

p.apply_async(go)
p.close()
p.join()

это дает мне:

1
[ERROR/PoolWorker-1] Traceback (most recent call last):
  File "mpdebug.py", line 24, in __call__
    result = self.__callable(*args, **kwargs)
  File "mpdebug.py", line 44, in go
    raise Exception()
Exception

может быть, я что-то упускаю, но разве это не то, что get метод результата возвращает объект? Смотрите Процесс, Бассейны.

класс многопроцессорной обработки.бассейн.Asyncresult, к

класс результата, возвращаемого пулом.apply_async() и бассейн.map_async ().get ([timeout])
Верните результат, когда он прибудет. Если тайм-аут не равен None и результат не поступает в пределах тайм-аут секунд, то многопроцессорная обработка.TimeoutError поднят. Если дистанционное управление вызов вызвал исключение, то это исключение будет повторно вызвано get ().

Итак, слегка изменив свой пример, можно сделать

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()

что дает в результате

1
Traceback (most recent call last):
  File "rob.py", line 10, in <module>
    x.get()
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
Exception: foobar

Это не совсем удовлетворительно, так как он не печатает трассировку, но лучше, чем ничего.

обновление: эта ошибка была исправлена в Python 3.4, любезно Ричард Аудкерк. Смотрите в выпуске получить метод многопроцессорной обработки.бассейн.Асинхронный должен вернуть полную трассировку.

решение с наибольшим количеством голосов на момент написания имеет проблему:

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()  ## waiting here for go() to complete...
p.close()
p.join()

как отметил @dfrankow, он будет ждать x.get(), что разрушает точку запуска задачи асинхронно. Итак, для большей эффективности (в частности, если ваша рабочая функция go занимает много времени) я бы изменил его на:

from multiprocessing import Pool

def go(x):
    print(1)
    # task_that_takes_a_long_time()
    raise Exception("Can't go anywhere.")
    print(2)
    return x**2

p = Pool()
results = []
for x in range(1000):
    results.append( p.apply_async(go, [x]) )

p.close()

for r in results:
     r.get()

преимущества: рабочая функция выполняется асинхронно, поэтому если, например, вы выполняете много задач на нескольких ядрах, это будет намного эффективнее, чем оригинальное решение.

недостатки: если в рабочей функции есть исключение, оно будет вызвано только после пул выполнил все задачи. Это может быть или не быть желательным поведением. отредактировано в соответствии с комментарием @colinfang, который исправил это.

Я имел успех регистрации исключений с этим декоратором:

import traceback, functools, multiprocessing

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except:
            print 'Exception in '+func.__name__
            traceback.print_exc()
    return wrapped_func

С кодом в вопросе, это

@trace_unhandled_exceptions
def go():
    print(1)
    raise Exception()
    print(2)

p = multiprocessing.Pool(1)

p.apply_async(go)
p.close()
p.join()

просто украсьте функцию, которую вы передаете в свой пул процессов. Ключ к этой работе -@functools.wraps(func) в противном случае многопроцессорных бросает!--4-->.

код выше дает

1
Exception in go
Traceback (most recent call last):
  File "<stdin>", line 5, in wrapped_func
  File "<stdin>", line 4, in go
Exception
import logging
from multiprocessing import Pool

def proc_wrapper(func, *args, **kwargs):
    """Print exception because multiprocessing lib doesn't return them right."""
    try:
        return func(*args, **kwargs)
    except Exception as e:
        logging.exception(e)
        raise

def go(x):
    print x
    raise Exception("foobar")

p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()

Я создал модуль RemoteException.py это показывает полную трассировку исключения в процессе. Вместо python2. скачать и добавьте это в свой код:

import RemoteException

@RemoteException.showError
def go():
    raise Exception('Error!')

if __name__ == '__main__':
    import multiprocessing
    p = multiprocessing.Pool(processes = 1)
    r = p.apply(go) # full traceback is shown here

Я бы попробовал использовать pdb:

import pdb
import sys
def handler(type, value, tb):
  pdb.pm()
sys.excepthook = handler

Так как вы использовали apply_sync, Я думаю, что вариант использования-это сделать некоторые задачи синхронизации. Использование обратного вызова для обработки является еще одним вариантом. Обратите внимание, что эта опция доступна только для python3.2 и выше и недоступна на python2.7.

from multiprocessing import Pool

def callback(result):
    print('success', result)

def callback_error(result):
    print('error', result)

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)

# You can do another things

p.close()
p.join()

так как уже есть достойные ответы для multiprocessing.Pool доступно, я предоставлю решение, используя другой подход для полноты.

на python >= 3.2 следующее решение кажется самым простым:

from concurrent.futures import ProcessPoolExecutor, wait

def go():
    print(1)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor() as p:
    for i in range(10):
        futures.append(p.submit(go))

results = [f.result() for f in futures]

плюсы:

  • очень мало кода
  • вызывает исключение в основном технологическом процессе
  • обеспечивает трассировку стека
  • нет внешних зависимостей

для получения дополнительной информации о API пожалуйста, проверьте: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

кроме того, если вы отправляете большое количество задач и вы хотите, чтобы ваш основной процесс на неудачу, как только одна из ваших задач не удается, вы можете использовать следующий фрагмент:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time


def go():
    print(1)
    time.sleep(0.3)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor(1) as p:
    for i in range(10):
        futures.append(p.submit(go))

    for f in as_completed(futures):
        if f.exception() is not None:
            for f in futures:
                f.cancel()
            break

[f.result() for f in futures]

все остальные ответы не выполняются только после выполнения всех задач.