Multiprocessing: Verwenden Sie tqdm, um einen Fortschrittsbalken anzuzeigen

95

Um meinen Code "pythonischer" und schneller zu machen, verwende ich "Multiprocessing" und eine Kartenfunktion, um ihn a) die Funktion und b) den Bereich der Iterationen zu senden.

Die implantierte Lösung (dh tqdm direkt im Bereich tqdm.tqdm (Bereich (0, 30)) aufrufen) funktioniert nicht mit Multiprocessing (wie im folgenden Code formuliert).

Der Fortschrittsbalken wird von 0 bis 100% angezeigt (wenn Python den Code liest?), Zeigt jedoch nicht den tatsächlichen Fortschritt der Kartenfunktion an.

Wie kann ein Fortschrittsbalken angezeigt werden, der angibt, in welchem ​​Schritt sich die Kartenfunktion befindet?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

Jede Hilfe oder Anregungen sind willkommen ...

SciPy
quelle
Können Sie das Code-Snippet des Fortschrittsbalkens veröffentlichen?
Alex
1
Für Leute, die nach einer Lösung suchen mit .starmap(): Hier ist ein Patch zum PoolHinzufügen .istarmap(), der auch funktioniert tqdm.
Darkonaut

Antworten:

124

Verwenden Sie imap anstelle von map, wodurch ein Iterator für verarbeitete Werte zurückgegeben wird.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
hkyi
quelle
13
Eine umschließende list () -Anweisung wartet auf das Ende des Iterators. total = ist auch erforderlich, da tqdm nicht weiß, wie lange die Iteration
dauern
13
Gibt es eine ähnliche Lösung für starmap()?
Tarashypka
1
for i in tqdm.tqdm(...): pass kann ein list(tqdm.tqdm)
direkter sein
1
Dies funktioniert, aber hat jemand anderes den Fortschrittsbalken für jede Iteration kontinuierlich in einer neuen Zeile drucken lassen?
Dennis Subachev
3
Das Verhalten wird verdrahtet , wenn bestimmte chunk_sizevon p.imap. Kann tqdmjede Iteration anstelle jedes Blocks aktualisiert werden?
Huangbiubiu
50

Lösung gefunden: Seien Sie vorsichtig! Aufgrund der Mehrfachverarbeitung kann die Schätzzeit (Iteration pro Schleife, Gesamtzeit usw.) instabil sein, aber der Fortschrittsbalken funktioniert einwandfrei.

Hinweis: Der Kontextmanager für Pool ist nur ab Python Version 3.3 verfügbar

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
                pbar.update()
SciPy
quelle
2
pbar.close()nicht erforderlich, wird es automatisch bei Beendigung vonwith
Sagar Kar
5
Ist hier der zweite / innere tqdmAnruf notwendig?
Shadowtalker
5
Was ist mit der Ausgabe von _foo (my_number), die als "r" zurückgegeben wird?
Likak
3
Gibt es eine ähnliche Lösung für starmap()?
Tarashypka
2
@shadowtalker - es scheint ohne zu funktionieren;). Wie auch immer - imap_unorderedist hier der Schlüssel, es bietet die beste Leistung und die besten Schätzungen für den Fortschrittsbalken.
Tomasz Gandor
17

Sie können p_tqdmstattdessen verwenden.

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))
Victor Quach
quelle
1
Das funktioniert sehr gut und war sehr einfach pip install. Dies ersetzt tqdm für die meisten meiner Bedürfnisse
crypdick
Merci Victor;)
Gabriel Romon
p_tqdmist beschränkt auf multiprocessing.Pool, nicht verfügbar für Threads
pateheo
15

Es tut uns leid, dass Sie zu spät kommen, aber wenn Sie nur eine gleichzeitige Karte benötigen, ist in der neuesten Version ( tqdm>=4.42.0) jetzt Folgendes integriert:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

Verweise: https://tqdm.github.io/docs/contrib.concurrent/ und https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

casper.dcl
quelle
Danke dafür. Funktioniert problemlos, viel besser als jede andere Lösung, die ich ausprobiert habe.
user3340499
Cool (+1), wirft aber HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))Jupyter
Ébe Isaac
@ Ébe-Isaac siehe github.com/tqdm/tqdm/issues/937
casper.dcl
Ich sehe ein Problem mit der Diskussion zum Hacken von tqdm_notebook, kann jedoch keine Lösung für tqdm.contrib.concurrent finden.
Ébe Isaac
8

Basierend auf der Antwort von Xavi Martínez habe ich die Funktion geschrieben imap_unordered_bar. Es kann auf die gleiche Weise verwendet werden, imap_unorderedmit dem einzigen Unterschied, dass eine Verarbeitungsleiste angezeigt wird.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))
Oliver Wilken
quelle
3
Dadurch wird der Balken bei jedem Schritt in einer neuen Zeile neu gezeichnet. Wie aktualisiere ich dieselbe Zeile?
misantroop
Lösung in meinem Fall (Windows / Powershell): Colorama.
Misantroop
'pbar.close () nicht erforderlich, es wird automatisch bei Beendigung von' geschlossen, wie der Kommentar, den Sagar zu @ scipys Antwort abgegeben hat
Tejas Shetty
0
import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
dkrynicki
quelle
0

Hier ist meine Einstellung, wann Sie Ergebnisse von Ihren parallel ausgeführten Funktionen zurückerhalten müssen. Diese Funktion erledigt einige Dinge (es gibt einen anderen Beitrag von mir, der dies weiter erklärt), aber der entscheidende Punkt ist, dass eine Warteschlange für anstehende Aufgaben und eine Warteschlange für abgeschlossene Aufgaben vorhanden ist. Wenn die Mitarbeiter mit jeder Aufgabe in der ausstehenden Warteschlange fertig sind, fügen sie die Ergebnisse in die Warteschlange für abgeschlossene Aufgaben ein. Sie können die Prüfung mit der Fortschrittsanzeige tqdm in die Warteschlange für abgeschlossene Aufgaben einschließen. Ich stelle die Implementierung der Funktion do_work () hier nicht ein, sie ist nicht relevant, da hier die Meldung lautet, dass die Warteschlange für abgeschlossene Aufgaben überwacht und der Fortschrittsbalken jedes Mal aktualisiert werden soll, wenn ein Ergebnis vorliegt.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results
Nick B.
quelle
-2

Dieser Ansatz ist einfach und funktioniert.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()
Vijayabhaskar J.
quelle