Die Dokumentation für das multiprocessing
Modul zeigt, wie eine Warteschlange an einen Prozess übergeben wird, mit dem begonnen wurde multiprocessing.Process
. Aber wie kann ich eine Warteschlange für asynchrone Arbeitsprozesse freigeben, mit denen begonnen wurde apply_async
? Ich brauche keine dynamische Verbindung oder irgendetwas anderes, nur eine Möglichkeit für die Arbeiter, ihre Ergebnisse (wiederholt) an die Basis zurückzumelden.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Dies schlägt fehl mit :
RuntimeError: Queue objects should only be shared between processes through inheritance
. Ich verstehe, was dies bedeutet, und ich verstehe den Rat zu erben, anstatt das Beizen / Entpicken (und alle speziellen Windows-Einschränkungen) zu erfordern. Aber wie kann ich pass die Warteschlange in einer Art und Weise , das funktioniert? Ich kann kein Beispiel finden und habe mehrere Alternativen ausprobiert, die auf verschiedene Weise fehlgeschlagen sind. Hilfe bitte?
queue.Queue()
das nicht geeignet ist?queue.Queue
wurde für das Threading mit In-Memory-Sperren erstellt. In einer Multiprozess-Umgebung erhält jeder Unterprozess eine eigene Kopie einerqueue.Queue()
Instanz in seinem eigenen Speicherbereich, da Unterprozesse (meistens) keinen gemeinsamen Speicher haben.multiprocessing.Pool
hat bereits eine gemeinsame Ergebniswarteschlange, es ist nicht erforderlich, zusätzlich a einzubeziehenManager.Queue
.Manager.Queue
ist einequeue.Queue
(Multithreading-Warteschlange) unter der Haube, die sich auf einem separaten Serverprozess befindet und über Proxys verfügbar gemacht wird. Dies erhöht den Overhead im Vergleich zur internen Warteschlange von Pool. Im Gegensatz zur nativen Ergebnisbehandlung von Pool kannManager.Queue
auch nicht garantiert werden , dass die Ergebnisse in der Reihenfolge bestellt werden.Die Worker-Prozesse werden nicht gestartet
.apply_async()
. Dies geschieht bereits beim InstanziierenPool
. Was wird gestartet , wenn Sie anrufenpool.apply_async()
ist ein neuer „Job“. Die Arbeitsprozesse von Pool führen die Funktionmultiprocessing.pool.worker
unter der Haube aus. Diese Funktion kümmert sich um die Verarbeitung neuer "Aufgaben", die über den internen Pool übertragen wurden,Pool._inqueue
und um das Zurücksenden der Ergebnisse an den übergeordneten Pool über den PoolPool._outqueue
. Ihre Angabefunc
wird innerhalb ausgeführtmultiprocessing.pool.worker
.func
muss nurreturn
etwas und das Ergebnis wird automatisch an die Eltern zurückgesendet..apply_async()
gibt sofort (asynchron) einAsyncResult
Objekt zurück (Alias fürApplyResult
). Sie müssen.get()
dieses Objekt aufrufen (blockiert es), um das tatsächliche Ergebnis zu erhalten. Eine andere Möglichkeit wäre, eine Rückruffunktion zu registrieren , die ausgelöst wird, sobald das Ergebnis fertig ist.Beispielausgabe:
Hinweis: Wenn Sie den
timeout
Parameter-für angeben,.get()
wird die eigentliche Verarbeitung der Aufgabe innerhalb des Workers nicht gestoppt, sondern nur das wartende übergeordnete Element durch Auslösen von a entsperrtmultiprocessing.TimeoutError
.quelle
error_callback
-parameter fürapply_async
, so dass sich seitdem nicht viel geändert hat.