Den Fortschritt eines Python-Multiprocessing-Pools anzeigen imap_unordered call?

95

Ich habe ein Skript, das erfolgreich eine Reihe von Aufgaben des Multiprocessing Pool mit einem imap_unordered()Aufruf ausführt:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

Mein Wert num_tasksliegt jedoch bei 250.000, sodass der join()Haupt-Thread etwa 10 Sekunden lang gesperrt wird. Ich möchte in der Lage sein, schrittweise zur Befehlszeile zurückzukehren, um anzuzeigen, dass der Hauptprozess nicht gesperrt ist. Etwas wie:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(2)

Gibt es eine Methode für das Ergebnisobjekt oder den Pool selbst, die die Anzahl der verbleibenden Aufgaben angibt? Ich habe versucht, ein multiprocessing.ValueObjekt als Zähler zu verwenden ( do_workruft counter.value += 1nach Ausführung seiner Aufgabe eine Aktion auf), aber der Zähler erreicht nur ~ 85% des Gesamtwerts, bevor das Inkrementieren gestoppt wird.

Mitternachtsblitz
quelle

Antworten:

80

Es ist nicht erforderlich, auf private Attribute der Ergebnismenge zuzugreifen:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
jfs
quelle
7
Ich sehe den Ausdruck erst nach dem Beenden des Codes (nicht bei jeder Iteration). Hast du einen vorschlag
Hanan Shteingart
@ HananShteingart: Es funktioniert gut auf meinem System (Ubuntu) mit Python 2 und 3. Ich habe def do_word(*a): time.sleep(.1)als Beispiel verwendet. Wenn es bei Ihnen nicht funktioniert, erstellen Sie ein vollständiges Beispiel für minimalen Code, das Ihr Problem demonstriert: Beschreiben Sie mit Worten, was Sie erwarten und was stattdessen passiert, und erwähnen Sie, wie Sie Ihr Python-Skript ausführen, welches Betriebssystem Sie verwenden und welche Python-Version Sie verwenden und poste es als neue Frage .
JFS
13
Ich hatte das gleiche Problem wie @HananShteingart: Es liegt daran, dass ich versucht habe, es zu verwenden Pool.map(). Ich habe das nicht nur erkannt imap()und imap_unordered()arbeite auf diese Weise - die Dokumentation sagt nur "Eine faulere Version von map ()", bedeutet aber wirklich "der zugrunde liegende Iterator gibt Ergebnisse zurück, sobald sie eingehen".
Simonmacmullen
@simonmacmullen: sowohl die Frage als auch meine Antwort verwenden imap_unordered(). Hanans Problem ist wahrscheinlich darauf zurückzuführen sys.stderr.write('\r..')(dass dieselbe Zeile überschrieben wird, um den Fortschritt anzuzeigen).
JFS
2
Auch möglich! Ich wollte hauptsächlich eine dumme Annahme dokumentieren, die ich gemacht hatte - falls jemand anderes, der dies liest, es auch machte.
Simonmacmullen
94

Mein persönlicher Favorit - gibt Ihnen einen schönen kleinen Fortschrittsbalken und eine Abschluss-ETA, während die Dinge parallel laufen und sich verpflichten.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass
Tim
quelle
64
Was ist, wenn der Pool einen Wert zurückgibt?
Nickpick
11
Ich habe eine leere Liste mit dem Namen result vor der Schleife erstellt und dann innerhalb der Schleife einfach result.append (x). Ich habe dies mit 2 Prozessen versucht und imap anstelle von map verwendet und alles hat so funktioniert, wie ich es wollte @nickpick
bs7280
2
Mein Fortschrittsbalken iteriert also zu neuen Zeilen, anstatt an Ort und Stelle fortzufahren. Gibt es eine Idee, warum dies so sein könnte?
Austin
2
Vergessen Sie nichtpip install tqdm
Mr. T
3
@ bs7280 Mit result.append (x) meinten Sie result.append (_)? Was ist x?
Jason
27

Ich stellte fest, dass die Arbeit bereits erledigt war, als ich versuchte, den Fortschritt zu überprüfen. Dies hat bei mir mit tqdm funktioniert .

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

Dies sollte mit allen Arten der Mehrfachverarbeitung funktionieren, unabhängig davon, ob sie blockieren oder nicht.

reubano
quelle
4
Ich denke, erstellt eine Reihe von Threads, und jeder Thread zählt unabhängig
nburn42
Ich habe Funktionen innerhalb von Funktionen, die zu einem Beizfehler führen.
Ojunk
21

Ich habe selbst eine Antwort mit etwas mehr Graben gefunden: Als ich mir __dict__das imap_unorderedErgebnisobjekt ansah, stellte ich fest, dass es ein _indexAttribut hat, das mit jedem Abschluss der Aufgabe erhöht wird. Das funktioniert also für die Protokollierung, die in die whileSchleife eingeschlossen ist:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

Ich habe jedoch festgestellt, dass das Austauschen von imap_unorderedfür eine map_asyncviel schnellere Ausführung führt, obwohl das Ergebnisobjekt etwas anders ist. Stattdessen hat das Ergebnisobjekt von map_asyncein _number_leftAttribut und eine ready()Methode:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)
Mitternachtsblitz
quelle
3
Ich habe dies für Python 2.7.6 getestet und rs._number_left scheint die Anzahl der verbleibenden Chunks zu sein. Wenn also rs._chunksize nicht 1 ist, ist rs._number_left nicht die Anzahl der verbleibenden Listenelemente.
Allen
Wo soll ich diesen Code ablegen? Ich meine, dies wird erst ausgeführt, wenn der Inhalt von bekannt rsist und es etwas spät ist oder nicht?
Wakan Tanka
@WakanTanka: Es geht in das Hauptskript, nachdem es die zusätzlichen Threads abgespalten hat. In meinem ursprünglichen Beispiel geht es in die "while" -Schleife, in rsder bereits die anderen Threads gestartet wurden.
MidnightLightning
1
Könnten Sie bitte Ihre Frage und / oder Antwort bearbeiten, um ein Mindestarbeitsbeispiel zu zeigen? Ich sehe rsin keiner Schleife, ich bin ein Multiprozessor-Neuling und das würde helfen. Vielen Dank.
Wakan Tanka
1
Zumindest in funktioniert python 3.5die Lösung _number_leftnicht. _number_leftstellt die Chunks dar, die noch verarbeitet werden müssen. Wenn beispielsweise 50 Elemente parallel an meine Funktion übergeben werden sollen, werden für einen Thread-Pool mit 3 Prozessen _map_async()10 Chunks mit jeweils 5 Elementen erstellt. _number_leftstellt dann dar, wie viele dieser Blöcke abgeschlossen wurden.
mSSM
9

Ich weiß, dass dies eine ziemlich alte Frage ist, aber hier ist, was ich tue, wenn ich den Fortschritt eines Aufgabenpools in Python verfolgen möchte.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

Grundsätzlich verwenden Sie apply_async mit einem Callbak (in diesem Fall wird der zurückgegebene Wert an eine Liste angehängt), sodass Sie nicht warten müssen, um etwas anderes zu tun. Anschließend überprüfen Sie innerhalb einer while-Schleife den Fortschritt der Arbeit. In diesem Fall habe ich ein Widget hinzugefügt, damit es besser aussieht.

Die Ausgabe:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

Ich hoffe es hilft.

Julien Tourille
quelle
Ich muss mich ändern: [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]für(pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)
David Przybilla
Das ist nicht wahr. Ein Generatorobjekt funktioniert hier nicht. Überprüft.
Swagatam
9

Wie von Tim vorgeschlagen, können Sie dieses Problem verwenden tqdmund imaplösen. Ich bin gerade auf dieses Problem gestoßen und habe die imap_unorderedLösung optimiert , damit ich auf die Ergebnisse des Mappings zugreifen kann. So funktioniert das:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

Wenn Sie sich nicht für die von Ihren Jobs zurückgegebenen Werte interessieren, müssen Sie die Liste keiner Variablen zuweisen.

mrapacz
quelle
4

für alle, die nach einer einfachen Lösung suchen, mit der sie arbeiten Pool.apply_async():

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.5)
    return x**2

n = 10

p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
    i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]
Zeawoas
quelle
3

Ich habe eine benutzerdefinierte Klasse erstellt, um einen Fortschrittsausdruck zu erstellen. Maby das hilft:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results
Aronstef
quelle
1

Probieren Sie diesen einfachen, auf Warteschlangen basierenden Ansatz aus, der auch beim Pooling verwendet werden kann. Beachten Sie, dass beim Drucken nach dem Starten des Fortschrittsbalkens dieser verschoben wird, zumindest für diesen bestimmten Fortschrittsbalken. (PyPI Fortschritte 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()
Mott das Tupel
quelle