Прерываний клавиатуры с языка Python многопроцессорных бассейн


как я могу обрабатывать события KeyboardInterrupt с многопроцессорными пулами python? Вот простой пример:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

при выполнении кода выше,KeyboardInterrupt поднимается, когда я нажимаю ^C, но процесс просто зависает в этот момент, и я должен убить его внешне.

Я хочу иметь возможность нажать ^C в любое время и заставить все процессы выйти изящно.

8 115

8 ответов:

это ошибка Python. При ожидании условия в потоке.Состояние.ждать(), KeyboardInterrupt никогда не отправляется. Репро:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

исключение KeyboardInterrupt не будет доставлено до тех пор, пока wait() не вернется, и оно никогда не вернется, поэтому прерывание никогда не происходит. KeyboardInterrupt почти наверняка должен прервать ожидание состояния.

обратите внимание, что это не происходит, если указан тайм-аут; cond.wait (1) немедленно получит прерывание. Так, обходной путь-указать тайм-аут. Для этого замените

    results = pool.map(slowly_square, range(40))

С

    results = pool.map_async(slowly_square, range(40)).get(9999999)

или аналогичные.

из того, что я недавно нашел, лучшим решением является настройка рабочих процессов для полного игнорирования SIGINT и ограничения всего кода очистки родительским процессом. Это устраняет проблему для простаивающих и занятых рабочих процессов и не требует кода обработки ошибок в дочерних процессах.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

объяснение и полный пример кода можно найти в http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ и http://github.com/jreese/multiprocessing-keyboardinterrupt соответственно.

по некоторым причинам, только исключения наследуются от базы Exception класс обрабатываются нормально. В качестве обходного пути вы можете повторно поднять свой KeyboardInterrupt как Exception например:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

обычно вы получите следующий результат:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

так что если вы нажмете ^C вы получите:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

обычно эта простая структура работает для Ctrl -C на бассейн :

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Как было сказано в нескольких подобных сообщениях:

Capture keyboardinterrupt в Python без try-except

кажется, есть две проблемы, которые делают исключения при мультипроцессорной раздражает. Первый (отметил Гленн) является то, что вам нужно использовать map_async с таймаутом вместо map чтобы получить немедленный ответ (т. е. не заканчивать обработку всего списка). Второй (отмеченный Андреем) заключается в том, что многопроцессорная обработка не ловит исключения, которые не наследуются от Exception (например, SystemExit). Итак, вот мое решение, которое касается обоих из них:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

Я обнаружил, что на данный момент лучшим решением является не использовать многопроцессорную обработку.функция пула, но скорее сверните свою собственную функциональность пула. Я привел пример, демонстрирующий ошибку с apply_async, а также пример, показывающий, как избежать использования функциональности пула в целом.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

проголосованный ответ не решает основную проблему, но подобный побочный эффект.

Джесси Ноллер, автор многопроцессорной библиотеки, объясняет, как правильно обращаться с CTRL+C при использовании multiprocessing.Pool на блоге.

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

Я новичок в Python. Я искал везде ответ и наткнулся на это и несколько других блогов и видео на youtube. Я попытался скопировать вставить код автора выше и воспроизвести его на моем python 2.7.13 в windows 7 64 - бит. Это близко к тому, чего я хочу достичь.

Я сделал мои дочерние процессы игнорировать ControlC и сделать родительский процесс завершить. Похоже, что обход дочернего процесса позволяет избежать этой проблемы для меня.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

часть начиная с pool.terminate() никогда, кажется, не выполнить.