Python: многопроцессорная обработка.map: если один процесс вызывает исключение, почему не вызываются блоки finally других процессов?
Мое понимание таково, что, наконец, предложения должны *всегда* выполняться, если была введена попытка.
import random
from multiprocessing import Pool
from time import sleep
def Process(x):
try:
print x
sleep(random.random())
raise Exception('Exception: ' + x)
finally:
print 'Finally: ' + x
Pool(3).map(Process, ['1','2','3'])
Ожидаемый результат состоит в том, что для каждого из x, который печатается самостоятельно строкой 8, должно быть появление 'Finally x'.
Пример вывода:
$ python bug.py
1
2
3
Finally: 2
Traceback (most recent call last):
File "bug.py", line 14, in <module>
Pool(3).map(Process, ['1','2','3'])
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 225, in map
return self.map_async(func, iterable, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 522, in get
raise self._value
Exception: Exception: 2
Кажется, что исключение, завершающее один процесс, завершает Родительский и родственный процессы, хотя есть дальнейшая работа , необходимая для выполнения в других процессах. процессы.
Почему я ошибаюсь? Почему это правильно? Если это верно, то как следует безопасно очищать ресурсы в многопроцессном Python?
3 ответа:
Короткий ответ:
SIGTERM
козыриfinally
.Длинный ответ: включите ведение журнала с помощью
mp.log_to_stderr()
:import random import multiprocessing as mp import time import logging logger=mp.log_to_stderr(logging.DEBUG) def Process(x): try: logger.info(x) time.sleep(random.random()) raise Exception('Exception: ' + x) finally: logger.info('Finally: ' + x) result=mp.Pool(3).map(Process, ['1','2','3'])
Выходные данные журнала включают:
[DEBUG/MainProcess] terminating workers
, который соответствует этому коду в
multiprocessing.pool._terminate_pool
:if pool and hasattr(pool[0], 'terminate'): debug('terminating workers') for p in pool: p.terminate()
Каждый
p
вpool
являетсяmultiprocessing.Process
, а вызовterminate
(по крайней мере, на машинах без Windows) вызывает SIGTERM:Из
multiprocessing/forking.py
:class Popen(object) def terminate(self): ... try: os.kill(self.pid, signal.SIGTERM) except OSError, e: if self.wait(timeout=0.1) is None: raise
Таким образом, это сводится к тому, что происходит, когда процесс Python в наборе
try
отправляетсяSIGTERM
.Рассмотрим следующий пример (test.py):
import time def worker(): try: time.sleep(100) finally: print('enter finally') time.sleep(2) print('exit finally') worker()
Если вы запустите его, а затем передадите ему
SIGTERM
, то процесс завершится немедленно, без входа вfinally
набор, о чем свидетельствует отсутствие вывода и отсутствие задержки.В одном терминале:
% test.py
Во втором терминале:
% pkill -TERM -f "test.py"
Результат в первом терминале:
Terminated
Сравните это с тем, что происходит, когда процесс отправляется a
SIGINT
(C-c
):Во втором терминал:
% pkill -INT -f "test.py"
Результат в первом терминале:
enter finally exit finally Traceback (most recent call last): File "/home/unutbu/pybin/test.py", line 14, in <module> worker() File "/home/unutbu/pybin/test.py", line 8, in worker time.sleep(100) KeyboardInterrupt
Вывод:
SIGTERM
козыриfinally
.
Ответ отunutbu определенно объясняет, Почему вы получаете поведение, которое вы наблюдаете. Однако следует подчеркнуть, что SIGTERM отправляется только из - за того, как
multiprocessing.pool._terminate_pool
реализуется. Если вы можете избежать использованияPool
, то вы можете получить желаемое поведение. Вот заимствованный пример :from multiprocessing import Process from time import sleep import random def f(x): try: sleep(random.random()*10) raise Exception except: print "Caught exception in process:", x # Make this last longer than the except clause in main. sleep(3) finally: print "Cleaning up process:", x if __name__ == '__main__': processes = [] for i in range(4): p = Process(target=f, args=(i,)) p.start() processes.append(p) try: for process in processes: process.join() except: print "Caught exception in main." finally: print "Cleaning up main."
После отправки SIGINT, пример вывода:
Обратите внимание, что предложениеCaught exception in process: 0 ^C Cleaning up process: 0 Caught exception in main. Cleaning up main. Caught exception in process: 1 Caught exception in process: 2 Caught exception in process: 3 Cleaning up process: 1 Cleaning up process: 2 Cleaning up process: 3
finally
выполняется для всех процессов. Если вам нужна общая память, рассмотрите возможность использованияQueue
,Pipe
,Manager
, или какой-нибудь внешний магазин, напримерredis
илиsqlite3
.
finally
повторно вызывает исходное исключение , Если выreturn
из него. Исключение затем вызываетсяPool.map
и убивает все ваше приложение. Подпроцессы завершаются, и вы не видите никаких других исключений.Вы можете добавить
return
, чтобы проглотить исключение:def Process(x): try: print x sleep(random.random()) raise Exception('Exception: ' + x) finally: print 'Finally: ' + x return
Тогда вы должны иметь
None
в вашемmap
результате, когда произошло исключение.