Teilen einer Ergebniswarteschlange auf mehrere Prozesse

91

Die Dokumentation für das multiprocessingModul zeigt, wie eine Warteschlange an einen Prozess übergeben wird, mit dem begonnen wurde multiprocessing.Process. Aber wie kann ich eine Warteschlange für asynchrone Arbeitsprozesse freigeben, mit denen begonnen wurde apply_async? Ich brauche keine dynamische Verbindung oder irgendetwas anderes, nur eine Möglichkeit für die Arbeiter, ihre Ergebnisse (wiederholt) an die Basis zurückzumelden.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

Dies schlägt fehl mit : RuntimeError: Queue objects should only be shared between processes through inheritance. Ich verstehe, was dies bedeutet, und ich verstehe den Rat zu erben, anstatt das Beizen / Entpicken (und alle speziellen Windows-Einschränkungen) zu erfordern. Aber wie kann ich pass die Warteschlange in einer Art und Weise , das funktioniert? Ich kann kein Beispiel finden und habe mehrere Alternativen ausprobiert, die auf verschiedene Weise fehlgeschlagen sind. Hilfe bitte?

alexis
quelle

Antworten:

133

Verwenden Sie multiprocessing.Manager , um Ihre Warteschlange zu verwalten und sie auch verschiedenen Mitarbeitern zugänglich zu machen.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = pool.apply_async(worker, (33, q))
enderskill
quelle
Das hat es geschafft, danke! Es gab ein nicht verwandtes Problem mit dem asynchronen Aufruf in meinem ursprünglichen Code, daher habe ich das Update auch in Ihre Antwort kopiert.
Alexis
15
Irgendeine Erklärung, warum queue.Queue()das nicht geeignet ist?
Mrgloom
@mrgloom: queue.Queuewurde für das Threading mit In-Memory-Sperren erstellt. In einer Multiprozess-Umgebung erhält jeder Unterprozess eine eigene Kopie einer queue.Queue()Instanz in seinem eigenen Speicherbereich, da Unterprozesse (meistens) keinen gemeinsamen Speicher haben.
LeoRochael
@alexis Wie erhalte ich die Elemente aus dem Manager (). Queue (), nachdem mehrere Worker Daten in den Manager () eingefügt haben?
MSS
10

multiprocessing.Poolhat bereits eine gemeinsame Ergebniswarteschlange, es ist nicht erforderlich, zusätzlich a einzubeziehen Manager.Queue. Manager.Queueist eine queue.Queue(Multithreading-Warteschlange) unter der Haube, die sich auf einem separaten Serverprozess befindet und über Proxys verfügbar gemacht wird. Dies erhöht den Overhead im Vergleich zur internen Warteschlange von Pool. Im Gegensatz zur nativen Ergebnisbehandlung von Pool kann Manager.Queueauch nicht garantiert werden , dass die Ergebnisse in der Reihenfolge bestellt werden.

Die Worker-Prozesse werden nicht gestartet .apply_async(). Dies geschieht bereits beim Instanziieren Pool. Was wird gestartet , wenn Sie anrufen pool.apply_async()ist ein neuer „Job“. Die Arbeitsprozesse von Pool führen die Funktion multiprocessing.pool.workerunter der Haube aus. Diese Funktion kümmert sich um die Verarbeitung neuer "Aufgaben", die über den internen Pool übertragen wurden, Pool._inqueueund um das Zurücksenden der Ergebnisse an den übergeordneten Pool über den Pool Pool._outqueue. Ihre Angabe funcwird innerhalb ausgeführt multiprocessing.pool.worker. funcmuss nur returnetwas und das Ergebnis wird automatisch an die Eltern zurückgesendet.

.apply_async() gibt sofort (asynchron) ein AsyncResultObjekt zurück (Alias ​​für ApplyResult). Sie müssen .get()dieses Objekt aufrufen (blockiert es), um das tatsächliche Ergebnis zu erhalten. Eine andere Möglichkeit wäre, eine Rückruffunktion zu registrieren , die ausgelöst wird, sobald das Ergebnis fertig ist.

from multiprocessing import Pool

def busy_foo(i):
    """Dummy function simulating cpu-bound work."""
    for _ in range(int(10e6)):  # do stuff
        pass
    return i

if __name__ == '__main__':

    with Pool(4) as pool:
        print(pool._outqueue)  # DEMO
        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
        print(results[0])  # DEMO
        results = [res.get() for res in results]
        print(f'result: {results}')       

Beispielausgabe:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Hinweis: Wenn Sie den timeoutParameter-für angeben, .get()wird die eigentliche Verarbeitung der Aufgabe innerhalb des Workers nicht gestoppt, sondern nur das wartende übergeordnete Element durch Auslösen von a entsperrt multiprocessing.TimeoutError.

Darkonaut
quelle
Interessant, ich probiere es aus, wenn ich die erste Chance bekomme. Es hat 2012 sicherlich nicht so funktioniert.
Alexis
@alexis Python 2.7 (2010) fehlt hier nur der Kontextmanager und der error_callback-parameter für apply_async, so dass sich seitdem nicht viel geändert hat.
Darkonaut