Пул процессов Python не демонический?
можно ли создать пул python, который не является демоническим? Я хочу, чтобы пул мог вызывать функцию, которая имеет другой пул внутри.
Я хочу этого, потому что процессы deamon не могут создать процесс. В частности, это вызовет ошибку:
AssertionError: daemonic processes are not allowed to have children
например, рассмотрим сценарий, где function_a
есть бассейн, который работает function_b
который имеет пул, который работает function_c
. Эта цепочка функций завершится неудачей, потому что function_b
выполняется в процессе демона, и процессы демона не могут создавать процессы.
3 ответа:
The
multiprocessing.pool.Pool
класс создает рабочие процессы в его__init__
метод, делает их демоническими и запускает их, и невозможно повторно установить их доFalse
прежде чем они будут запущены (и после этого это больше не разрешено). Но вы можете создать свой собственный подклассmultiprocesing.pool.Pool
(multiprocessing.Pool
это просто функция обертки) и замените свой собственныйmultiprocessing.Process
подкласс, который всегда не является демоническим, который будет использоваться для рабочих процессов.вот полный пример о том, как это сделать. Важными частями являются два класса
NoDaemonProcess
иMyPool
наверху и позвонитьpool.close()
иpool.join()
наMyPool
экземпляр в конце.#!/usr/bin/env python # -*- coding: UTF-8 -*- import multiprocessing # We must import this explicitly, it is not imported by the top-level # multiprocessing module. import multiprocessing.pool import time from random import randint class NoDaemonProcess(multiprocessing.Process): # make 'daemon' attribute always return False def _get_daemon(self): return False def _set_daemon(self, value): pass daemon = property(_get_daemon, _set_daemon) # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool # because the latter is only a wrapper function, not a proper class. class MyPool(multiprocessing.pool.Pool): Process = NoDaemonProcess def sleepawhile(t): print("Sleeping %i seconds..." % t) time.sleep(t) return t def work(num_procs): print("Creating %i (daemon) workers and jobs in child." % num_procs) pool = multiprocessing.Pool(num_procs) result = pool.map(sleepawhile, [randint(1, 5) for x in range(num_procs)]) # The following is not really needed, since the (daemon) workers of the # child's pool are killed when the child is terminated, but it's good # practice to cleanup after ourselves anyway. pool.close() pool.join() return result def test(): print("Creating 5 (non-daemon) workers and jobs in main process.") pool = MyPool(5) result = pool.map(work, [randint(1, 5) for x in range(5)]) pool.close() pool.join() print(result) if __name__ == '__main__': test()
The многопроцессорная обработка модуль имеет приятный интерфейс для использования пулов с процессами или потоки. В зависимости от вашего текущего варианта использования, вы можете использовать
multiprocessing.pool.ThreadPool
для вашего внешнего пула, что приведет к потокам (что позволяет порождать процессы изнутри) в отличие от процессов.это может быть ограничено GIL, но в моем конкретном случае (Я тестировал оба), время запуска процессов с внешней стороны
Pool
создан здесь намного перевесило решение сThreadPool
.
это действительно легко поменять
Processes
наThreads
. Подробнее о том, как использоватьThreadPool
решение здесь или здесь.
проблема, с которой я столкнулся, заключалась в попытке импортировать глобалы между модулями, в результате чего строка ProcessPool() оценивалась несколько раз.
globals.py
from processing import Manager, Lock from pathos.multiprocessing import ProcessPool from pathos.threading import ThreadPool class SingletonMeta(type): def __new__(cls, name, bases, dict): dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self return super(SingletonMeta, cls).__new__(cls, name, bases, dict) def __init__(cls, name, bases, dict): super(SingletonMeta, cls).__init__(name, bases, dict) cls.instance = None def __call__(cls,*args,**kw): if cls.instance is None: cls.instance = super(SingletonMeta, cls).__call__(*args, **kw) return cls.instance def __deepcopy__(self, item): return item.__class__.instance class Globals(object): __metaclass__ = SingletonMeta """ This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children The root cause is that importing this file from different modules causes this file to be reevalutated each time, thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug """ def __init__(self): print "%s::__init__()" % (self.__class__.__name__) self.shared_manager = Manager() self.shared_process_pool = ProcessPool() self.shared_thread_pool = ThreadPool() self.shared_lock = Lock() # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin
затем безопасно импортировать из других частей кода
from globals import Globals Globals().shared_manager Globals().shared_process_pool Globals().shared_thread_pool Globals().shared_lock