Python Process Pool nicht dämonisch?

96

Wäre es möglich, einen Python-Pool zu erstellen, der nicht dämonisch ist? Ich möchte, dass ein Pool eine Funktion aufrufen kann, in der sich ein anderer Pool befindet.

Ich möchte dies, weil Deamon-Prozesse keinen Prozess erstellen können. Insbesondere wird der Fehler verursacht:

AssertionError: daemonic processes are not allowed to have children

Stellen Sie sich zum Beispiel das Szenario vor, in dem function_aein Pool ausgeführt wird, in function_bdem ein Pool ausgeführt wird function_c. Diese Funktionskette schlägt fehl, da function_bsie in einem Daemon-Prozess ausgeführt wird und Daemon-Prozesse keine Prozesse erstellen können.

Max
quelle
AFAIK, nein, es ist nicht möglich, dass alle Mitarbeiter im Pool dämonisiert sind und es nicht möglich ist , die Abhängigkeit zu injizieren . Übrigens verstehe ich den zweiten Teil Ihrer Frage nicht I want a pool to be able to call a function that has another pool insideund wie sich dies auf die Tatsache auswirkt, dass die Mitarbeiter dämonisiert sind.
Mouad
4
Denn wenn Funktion a einen Pool hat, der Funktion b ausführt, der einen Pool hat, der Funktion c ausführt, gibt es ein Problem in b, dass es in einem Dämonprozess ausgeführt wird und Dämonprozesse keine Prozesse erstellen können. AssertionError: daemonic processes are not allowed to have children
Max

Antworten:

117

Die multiprocessing.pool.PoolKlasse erstellt die Worker-Prozesse in ihrer __init__Methode, macht sie dämonisch und startet sie. Es ist nicht möglich, ihr daemonAttribut Falsevor dem Start zurückzusetzen (und danach ist dies nicht mehr zulässig). Sie können jedoch Ihre eigene Unterklasse von multiprocesing.pool.Pool( multiprocessing.Poolist nur eine Wrapper-Funktion) erstellen und Ihre eigene Unterklasse ersetzen multiprocessing.Process, die immer nicht dämonisch ist und für die Worker-Prozesse verwendet wird.

Hier ist ein vollständiges Beispiel dafür. Die wichtigen Teile sind die beiden Klassen NoDaemonProcessund MyPooloben und am Ende pool.close()und pool.join()auf Ihrer MyPoolInstanz aufzurufen .

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()
Chris Arndt
quelle
1
Ich habe meinen Code gerade erneut mit Python 2.7 / 3.2 (nach dem Korrigieren der "Druck" -Zeilen) unter Linux und Python 2.6 / 2.7 / 3.2 OS X getestet. Linux und Python 2.7 / 3.2 unter OS X funktionieren einwandfrei, aber der Code hängt tatsächlich mit Python 2.6 unter OS X (Lion). Dies scheint ein Fehler im Multiprocessing-Modul zu sein, der behoben wurde, aber ich habe den Bug-Tracker nicht überprüft.
Chris Arndt
1
Vielen Dank! Unter Windows müssen Sie auch anrufenmultiprocessing.freeze_support()
frmdstryr
2
Gute Arbeit. Wenn bei dieser Person ein Speicherverlust auftritt, versuchen Sie, "mit Schließen (MyPool (Prozesse = num_cpu)) als Pool:" zu verwenden, um den Pool ordnungsgemäß zu entsorgen
Chris Lucian
31
Was sind die Nachteile der Verwendung MyPoolanstelle der Standardeinstellung Pool? Mit anderen Worten, welche Kosten zahle ich im Austausch für die Flexibilität beim Starten von untergeordneten Prozessen? (Wenn es keine Kosten Poolgäbe, hätte der Standard vermutlich nicht-dämonische Prozesse verwendet).
Max
4
@machen Ja, das stimmt leider. In Python 3.6 wurde die PoolKlasse umfassend überarbeitet, es Processhandelt sich also nicht mehr um ein einfaches Attribut, sondern um eine Methode, die die Prozessinstanz zurückgibt, die sie aus einem Kontext erhält . Ich habe versucht, diese Methode zu überschreiben, um eine NoDaemonPoolInstanz zurückzugeben. Dies führt jedoch zu einer Ausnahme, AssertionError: daemonic processes are not allowed to have childrenwenn der Pool verwendet wird.
Chris Arndt
25

Ich musste in Python 3.7 einen nicht-dämonischen Pool verwenden und passte schließlich den in der akzeptierten Antwort angegebenen Code an. Unten befindet sich das Snippet, das den nicht-dämonischen Pool erstellt:

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)

Da die aktuelle Implementierung von multiprocessingumfassend überarbeitet wurde, um auf Kontexten zu basieren, müssen wir eine NoDaemonContextKlasse bereitstellen , die unser NoDaemonProcessas-Attribut hat. MyPoolverwendet dann diesen Kontext anstelle des Standardkontexts.

Trotzdem sollte ich warnen, dass dieser Ansatz mindestens zwei Einschränkungen aufweist:

  1. Dies hängt immer noch von den Implementierungsdetails des multiprocessingPakets ab und kann daher jederzeit unterbrochen werden.
  2. Es gibt triftige Gründe, warum multiprocessinges so schwierig war, nicht-dämonische Prozesse zu verwenden, von denen viele hier erklärt werden . Das überzeugendste ist meiner Meinung nach:

    Wenn Kinder-Threads mithilfe von Unterprozessen eigene Kinder erzeugen können, besteht die Gefahr, dass eine kleine Armee von Zombie-Enkelkindern erstellt wird, wenn entweder die übergeordneten oder untergeordneten Threads beendet werden, bevor der Unterprozess abgeschlossen ist und zurückkehrt.

Massimiliano
quelle
In Bezug auf die Einschränkung: Mein Anwendungsfall ist das Parallelisieren von Aufgaben, aber die Enkel geben Informationen an ihre Eltern zurück, die wiederum Informationen an ihre Eltern zurückgeben, nachdem sie eine erforderliche lokale Verarbeitung durchgeführt haben. Folglich wartet jede Ebene / jeder Zweig explizit auf alle ihre Blätter. Gilt die Einschränkung weiterhin, wenn Sie explizit warten müssen, bis die erzeugten Prozesse abgeschlossen sind?
A_A
Fehler AttributeError: module 'multiprocessing' has no attribute 'pool'in Python 3.8.0
Nyxynyx
@ Nyxynyx Vergiss nichtimport multiprocessing.pool
Chris Arndt
22

Das Multiprozessor- Modul verfügt über eine schöne Schnittstelle zur Verwendung von Pools mit Prozessen oder Threads. Abhängig von Ihrem aktuellen Anwendungsfall können Sie die Verwendung multiprocessing.pool.ThreadPoolfür Ihren äußeren Pool in Betracht ziehen , was zu Threads (die es ermöglichen, Prozesse von innen heraus zu erzeugen) im Gegensatz zu Prozessen führt.

Es könnte durch die GIL beschränkt sein, sondern in meinem speziellen Fall (I getestet beide) , die Startzeit für die Prozesse von den äußeren Poolwie geschaffen hier weit überwog mit der Lösung ThreadPool.


Es ist wirklich einfach zu tauschen Processesfür Threads. Lesen Sie hier oder hier mehr darüber, wie Sie eine ThreadPoolLösung verwenden .

timmwagener
quelle
Danke - das hat mir sehr geholfen - großartige Verwendung von Threading hier (um Prozesse zu
erzeugen,
1
Für Menschen, die nach einer praktischen Lösung suchen, die wahrscheinlich auf ihre Situation zutrifft, ist dies die richtige.
Abanana
6

Bei einigen Python-Versionen kann das Ersetzen des Standardpools durch einen benutzerdefinierten Fehler den folgenden Fehler auslösen : AssertionError: group argument must be None for now.

Hier habe ich eine Lösung gefunden, die helfen kann:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc
Atterratio
quelle
4

concurrent.futures.ProcessPoolExecutorhat diese Einschränkung nicht. Es kann problemlos einen verschachtelten Prozesspool haben:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

Der obige Demonstrationscode wurde mit Python 3.8 getestet.

Gutschrift: Antwort von jfs

Scharfsinn
quelle
1
Dies ist jetzt eindeutig die beste Lösung, da nur minimale Änderungen erforderlich sind.
DreamFlasher
1
funktioniert perfekt! ... als Randnotiz mit einem Kind - multiprocessing.Poolin a ProcessPoolExecutor.Poolist auch möglich!
Raphael
3

Das Problem, auf das ich gestoßen bin, war der Versuch, Globals zwischen Modulen zu importieren, was dazu führte, dass die ProcessPool () - Zeile mehrmals ausgewertet wurde.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Importieren Sie dann sicher von einer anderen Stelle in Ihrem Code

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         
James McGuigan
quelle
2

Ich habe Leute Umgang mit diesem Problem gesehen durch die Verwendung celery‚s Gabel multiprocessinggenannt Billard (Multiprocessing - Pool - Erweiterungen), die dämonische Prozesse zu laichen Kinder erlaubt. Der Walkaround besteht darin, das multiprocessingModul einfach zu ersetzen durch:

import billiard as multiprocessing
Tomasz Bartkowiak
quelle