Ich habe ein Skript, das erfolgreich eine Reihe von Aufgaben des Multiprocessing Pool mit einem imap_unordered()
Aufruf ausführt:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Mein Wert num_tasks
liegt jedoch bei 250.000, sodass der join()
Haupt-Thread etwa 10 Sekunden lang gesperrt wird. Ich möchte in der Lage sein, schrittweise zur Befehlszeile zurückzukehren, um anzuzeigen, dass der Hauptprozess nicht gesperrt ist. Etwas wie:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
Gibt es eine Methode für das Ergebnisobjekt oder den Pool selbst, die die Anzahl der verbleibenden Aufgaben angibt? Ich habe versucht, ein multiprocessing.Value
Objekt als Zähler zu verwenden ( do_work
ruft counter.value += 1
nach Ausführung seiner Aufgabe eine Aktion auf), aber der Zähler erreicht nur ~ 85% des Gesamtwerts, bevor das Inkrementieren gestoppt wird.
quelle
def do_word(*a): time.sleep(.1)
als Beispiel verwendet. Wenn es bei Ihnen nicht funktioniert, erstellen Sie ein vollständiges Beispiel für minimalen Code, das Ihr Problem demonstriert: Beschreiben Sie mit Worten, was Sie erwarten und was stattdessen passiert, und erwähnen Sie, wie Sie Ihr Python-Skript ausführen, welches Betriebssystem Sie verwenden und welche Python-Version Sie verwenden und poste es als neue Frage .Pool.map()
. Ich habe das nicht nur erkanntimap()
undimap_unordered()
arbeite auf diese Weise - die Dokumentation sagt nur "Eine faulere Version von map ()", bedeutet aber wirklich "der zugrunde liegende Iterator gibt Ergebnisse zurück, sobald sie eingehen".imap_unordered()
. Hanans Problem ist wahrscheinlich darauf zurückzuführensys.stderr.write('\r..')
(dass dieselbe Zeile überschrieben wird, um den Fortschritt anzuzeigen).Mein persönlicher Favorit - gibt Ihnen einen schönen kleinen Fortschrittsbalken und eine Abschluss-ETA, während die Dinge parallel laufen und sich verpflichten.
quelle
pip install tqdm
Ich stellte fest, dass die Arbeit bereits erledigt war, als ich versuchte, den Fortschritt zu überprüfen. Dies hat bei mir mit tqdm funktioniert .
pip install tqdm
Dies sollte mit allen Arten der Mehrfachverarbeitung funktionieren, unabhängig davon, ob sie blockieren oder nicht.
quelle
Ich habe selbst eine Antwort mit etwas mehr Graben gefunden: Als ich mir
__dict__
dasimap_unordered
Ergebnisobjekt ansah, stellte ich fest, dass es ein_index
Attribut hat, das mit jedem Abschluss der Aufgabe erhöht wird. Das funktioniert also für die Protokollierung, die in diewhile
Schleife eingeschlossen ist:Ich habe jedoch festgestellt, dass das Austauschen von
imap_unordered
für einemap_async
viel schnellere Ausführung führt, obwohl das Ergebnisobjekt etwas anders ist. Stattdessen hat das Ergebnisobjekt vonmap_async
ein_number_left
Attribut und eineready()
Methode:quelle
rs
ist und es etwas spät ist oder nicht?rs
der bereits die anderen Threads gestartet wurden.rs
in keiner Schleife, ich bin ein Multiprozessor-Neuling und das würde helfen. Vielen Dank.python 3.5
die Lösung_number_left
nicht._number_left
stellt die Chunks dar, die noch verarbeitet werden müssen. Wenn beispielsweise 50 Elemente parallel an meine Funktion übergeben werden sollen, werden für einen Thread-Pool mit 3 Prozessen_map_async()
10 Chunks mit jeweils 5 Elementen erstellt._number_left
stellt dann dar, wie viele dieser Blöcke abgeschlossen wurden.Ich weiß, dass dies eine ziemlich alte Frage ist, aber hier ist, was ich tue, wenn ich den Fortschritt eines Aufgabenpools in Python verfolgen möchte.
Grundsätzlich verwenden Sie apply_async mit einem Callbak (in diesem Fall wird der zurückgegebene Wert an eine Liste angehängt), sodass Sie nicht warten müssen, um etwas anderes zu tun. Anschließend überprüfen Sie innerhalb einer while-Schleife den Fortschritt der Arbeit. In diesem Fall habe ich ein Widget hinzugefügt, damit es besser aussieht.
Die Ausgabe:
Ich hoffe es hilft.
quelle
[pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
für(pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)
Wie von Tim vorgeschlagen, können Sie dieses Problem verwenden
tqdm
undimap
lösen. Ich bin gerade auf dieses Problem gestoßen und habe dieimap_unordered
Lösung optimiert , damit ich auf die Ergebnisse des Mappings zugreifen kann. So funktioniert das:Wenn Sie sich nicht für die von Ihren Jobs zurückgegebenen Werte interessieren, müssen Sie die Liste keiner Variablen zuweisen.
quelle
für alle, die nach einer einfachen Lösung suchen, mit der sie arbeiten
Pool.apply_async()
:quelle
Ich habe eine benutzerdefinierte Klasse erstellt, um einen Fortschrittsausdruck zu erstellen. Maby das hilft:
quelle
Probieren Sie diesen einfachen, auf Warteschlangen basierenden Ansatz aus, der auch beim Pooling verwendet werden kann. Beachten Sie, dass beim Drucken nach dem Starten des Fortschrittsbalkens dieser verschoben wird, zumindest für diesen bestimmten Fortschrittsbalken. (PyPI Fortschritte 1.5)
quelle