Fortschrittsanzeige während Pandas-Operationen

157

Ich führe regelmäßig Pandas-Operationen an Datenrahmen mit mehr als 15 Millionen Zeilen durch und würde gerne Zugriff auf eine Fortschrittsanzeige für bestimmte Operationen haben.

Gibt es eine textbasierte Fortschrittsanzeige für Pandas Split-Apply-Combine-Operationen?

Zum Beispiel in so etwas wie:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

Dabei feature_rolluphandelt es sich um eine etwas komplizierte Funktion, die viele DF-Spalten verwendet und mithilfe verschiedener Methoden neue Benutzerspalten erstellt. Diese Vorgänge können bei großen Datenrahmen eine Weile dauern. Daher möchte ich wissen, ob eine textbasierte Ausgabe in einem iPython-Notizbuch möglich ist, das mich über den Fortschritt auf dem Laufenden hält.

Bisher habe ich kanonische Loop-Fortschrittsindikatoren für Python ausprobiert, aber sie interagieren nicht auf sinnvolle Weise mit Pandas.

Ich hoffe, dass ich in der Pandas-Bibliothek / -Dokumentation etwas übersehen habe, das es einem ermöglicht, den Fortschritt eines Split-Apply-Combine zu erkennen. Eine einfache Implementierung würde möglicherweise die Gesamtzahl der Datenrahmen-Teilmengen betrachten, an denen die applyFunktion arbeitet, und den Fortschritt als den abgeschlossenen Bruchteil dieser Teilmengen melden.

Muss das vielleicht der Bibliothek hinzugefügt werden?

cwharland
quelle
Hast du einen% Prun (Profil) für den Code gemacht? Manchmal können Sie Operationen am gesamten Frame ausführen, bevor Sie sich bewerben, um Engpässe zu beseitigen
Jeff
@ Jeff: Wetten, ich habe das früher getan, um die letzte Leistung herauszuholen? Das Problem hängt wirklich mit der Pseudo-Map-Reduce-Grenze zusammen, an der ich arbeite, da die Zeilen im zweistelligen Millionenbereich liegen. Ich erwarte also keine Super-Geschwindigkeitssteigerungen, sondern möchte nur ein Feedback zum Fortschritt.
cwharland
Betrachten Sie Cythonisierung: pandas.pydata.org/pandas-docs/dev/…
Andy Hayden
@AndyHayden - Wie ich zu Ihrer Antwort kommentiert habe, ist Ihre Implementierung recht gut und verlängert den Gesamtauftrag um ein wenig Zeit. Ich habe auch drei Vorgänge innerhalb des Feature-Rollups zythonisiert, die die gesamte Zeit wiedererlangt haben, die jetzt dem Berichterstattungsfortschritt gewidmet ist. Am Ende wette ich also, dass ich Fortschrittsbalken mit einer Verkürzung der Gesamtverarbeitungszeit habe, wenn ich Cython für die gesamte Funktion durchführe.
cwharland

Antworten:

277

Aufgrund der großen Nachfrage tqdmhat Unterstützung für hinzugefügt pandas. Im Gegensatz zu den anderen Antworten wird dies Pandas nicht merklich verlangsamen - hier ein Beispiel für DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

Wenn Sie daran interessiert sind, wie dies funktioniert (und wie Sie es für Ihre eigenen Rückrufe ändern können), lesen Sie die Beispiele zu github , die vollständige Dokumentation zu pypi oder importieren Sie das Modul und führen Sie es aus help(tqdm).

BEARBEITEN


Um die ursprüngliche Frage direkt zu beantworten, ersetzen Sie:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

mit:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Hinweis: tqdm <= v4.8 : Für Versionen von tqdm unter 4.8 mussten Sie stattdessen Folgendes tqdm.pandas()tun:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
casper.dcl
quelle
5
tqdmwurde ursprünglich nur für einfache iterables erstellt: from tqdm import tqdm; for i in tqdm( range(int(1e8)) ): passDie Pandas-Unterstützung war ein kürzlich von mir durchgeführter Hack :)
casper.dcl
6
Übrigens, wenn Sie Jupyter-Notizbücher verwenden, können Sie auch tqdm_notebooks verwenden, um eine schönere Leiste zu erhalten. Zusammen mit Pandas müssten Sie es derzeit instanziieren, wie from tqdm import tqdm_notebook; tqdm_notebook().pandas(*args, **kwargs) hier zu sehen
grinsbaeckchen
2
Ab Version 4.8.1 verwenden Sie stattdessen tqdm.pandas (). github.com/tqdm/tqdm/commit/…
mork
1
Danke, @mork ist richtig. Wir arbeiten (langsam) an tqdmv5, wodurch die Dinge modularer werden.
casper.dcl
1
Aktuelle Syntaxempfehlungen finden Sie in der Dokumentation zu tqdm Pandas hier: pypi.python.org/pypi/tqdm#pandas-integration
Manu CJ
18

Jeffs Antwort optimieren (und dies als wiederverwendbare Funktion haben).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

Hinweis: Der Prozentsatz für die Aktualisierung des Fortschritts inline wird aktualisiert . Wenn Ihre Funktion nicht funktioniert, funktioniert dies nicht.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

Wie üblich können Sie dies als Methode zu Ihren groupby-Objekten hinzufügen:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

Wie in den Kommentaren erwähnt, ist dies keine Funktion, an deren Implementierung Kernpandas interessiert wären. Mit Python können Sie diese jedoch für viele Pandas-Objekte / -Methoden erstellen (dies wäre ein ziemlicher Arbeitsaufwand ... obwohl Sie diesen Ansatz verallgemeinern sollten).

Andy Hayden
quelle
Ich sage "ziemlich viel Arbeit", aber Sie könnten wahrscheinlich diese gesamte Funktion als (allgemeinerer) Dekorateur umschreiben.
Andy Hayden
Vielen Dank, dass Sie Jeffs Beitrag erweitert haben. Ich habe beide implementiert und die Verlangsamung ist jeweils minimal (insgesamt wurden 1,1 Minuten zu einem Vorgang hinzugefügt, der 27 Minuten dauerte). Auf diese Weise kann ich den Fortschritt sehen und angesichts des Ad-hoc-Charakters dieser Operationen halte ich dies für eine akzeptable Verlangsamung.
cwharland
Ausgezeichnet, froh, dass es geholfen hat. Ich war tatsächlich überrascht über die Verlangsamung (als ich ein Beispiel ausprobierte), ich erwartete, dass es viel schlimmer sein würde.
Andy Hayden
1
Um die Effizienz der veröffentlichten Methoden weiter zu steigern, war ich beim Datenimport faul (Pandas ist einfach zu gut im Umgang mit unordentlichem CSV !!) und einige meiner Einträge (~ 1%) hatten vollständig Einfügungen ausgeblendet (denken Sie ganz nach in einzelne Felder eingefügte Datensätze). Das Eliminieren dieser Funktionen führt zu einer massiven Beschleunigung des Feature-Rollups, da keine Unklarheiten darüber bestehen, was bei Split-Apply-Combine-Vorgängen zu tun ist.
Cwharland
1
Ich habe nur noch 8 Minuten Zeit ... aber ich habe dem Feature-Rollup etwas hinzugefügt (mehr Funktionen -> bessere AUC!). Diese 8 Minuten sind pro Block (derzeit insgesamt zwei Blöcke), wobei sich jeder Block in der Nähe von 12 Millionen Zeilen befindet. Also ja ... 16 Minuten, um mit HDFStore umfangreiche Operationen an 24 Millionen Zeilen durchzuführen (und es gibt nltk-Zeug im Feature-Rollup). Ziemlich gut. Hoffen wir, dass das Internet mich nicht nach der anfänglichen Ignoranz oder Ambivalenz gegenüber den durcheinandergebrachten Einfügungen
beurteilt
11

Falls Sie Unterstützung für die Verwendung in einem Jupyter / ipython-Notizbuch benötigen, wie ich es getan habe, finden Sie hier eine hilfreiche Anleitung und Quelle für den entsprechenden Artikel :

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

Beachten Sie den Unterstrich in der Importanweisung für _tqdm_notebook. Wie in dem Artikel erwähnt, befindet sich die Entwicklung in der späten Beta-Phase.

Victor Vulovic
quelle
8

Für alle, die tqdm auf ihren benutzerdefinierten parallelen Pandas-Apply-Code anwenden möchten.

(Ich habe im Laufe der Jahre einige der Bibliotheken für die Parallelisierung ausprobiert, aber ich habe nie eine 100% ige Parallelisierungslösung gefunden, hauptsächlich für die Apply-Funktion, und ich musste immer zurückkommen, um meinen "manuellen" Code zu erhalten.)

df_multi_core - das ist das, was du anrufst . Es akzeptiert:

  1. Dein df Objekt
  2. Der Funktionsname, den Sie aufrufen möchten
  3. Die Teilmenge der Spalten, für die die Funktion ausgeführt werden kann (hilft, Zeit / Speicher zu reduzieren)
  4. Die Anzahl der Jobs, die parallel ausgeführt werden sollen (-1 oder für alle Kerne weglassen)
  5. Alle anderen kwargs, die die Funktion des df akzeptiert (wie "Achse")

_df_split - Dies ist eine interne Hilfsfunktion , die global zum laufenden Modul positioniert werden muss (Pool.map ist "platzierungsabhängig"), andernfalls würde ich sie intern lokalisieren.

Hier ist der Code aus meinem Kern (ich werde dort weitere Pandas-Funktionstests hinzufügen):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Unten ist ein Testcode für eine parallelisierte Anwendung mit tqdm "progress_apply".

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

In der Ausgabe sehen Sie 1 Fortschrittsbalken für die Ausführung ohne Parallelisierung und Fortschrittsbalken pro Kern für die Ausführung mit Parallelisierung. Es gibt ein leichtes Hickup und manchmal erscheinen die restlichen Kerne sofort, aber selbst dann denke ich, dass es nützlich ist, da Sie die Fortschrittsstatistiken pro Kern erhalten (z. B. it / sec und Gesamtaufzeichnungen).

Geben Sie hier die Bildbeschreibung ein

Vielen Dank an @abcdaa für diese großartige Bibliothek!

mork
quelle
1
Danke @mork - zögern Sie nicht, github.com/tqdm/tqdm/wiki/How-to-make-a-great-Progress-Bar hinzuzufügen oder eine neue Seite unter github.com/tqdm/tqdm/wiki
casper zu erstellen. dcl
Vielen Dank, musste aber diesen Teil ändern: try: splits = np.array_split(df[subset], njobs) except ValueError: splits = np.array_split(df, njobs)Wechseln Sie aufgrund der KeyError-Ausnahme anstelle von ValueError zu Exception, um alle Fälle zu behandeln.
Marius
Danke @mork - diese Antwort sollte höher sein.
Andy
5

Sie können dies leicht mit einem Dekorateur tun

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

Verwenden Sie dann einfach die modifizierte_Funktion (und ändern Sie sie, wenn Sie möchten, dass sie gedruckt wird).

Jeff
quelle
1
Offensichtliche Warnung, dies wird Ihre Funktion verlangsamen! Sie können es sogar mit dem Fortschritt stackoverflow.com/questions/5426546/… aktualisieren lassen, z. B. count / len als Prozentsatz.
Andy Hayden
Ja - Sie haben Ordnung (Anzahl der Gruppen). Je nachdem, was Ihr Engpass ist, kann dies einen Unterschied machen
Jeff
Vielleicht ist es intuitiv, dies in eine logged_apply(g, func)Funktion zu packen, in der Sie Zugriff auf die Bestellung haben und von Anfang an protokollieren können.
Andy Hayden
Ich habe das oben genannte in meiner Antwort getan, auch freche prozentuale Aktualisierung. Eigentlich konnte ich deine nicht zum Laufen bringen ... Ich denke mit den Wraps. Wenn Sie es für die Anwendung verwenden, ist es sowieso nicht so wichtig.
Andy Hayden
1

Ich habe Jeffs Antwort dahingehend geändert , dass sie eine Summe enthält, sodass Sie den Fortschritt und eine Variable verfolgen können, um nur alle X-Iterationen zu drucken (dies verbessert die Leistung tatsächlich um ein Vielfaches, wenn "print_at" einigermaßen hoch ist).

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

Die Funktion clear_output () stammt von

from IPython.core.display import clear_output

Wenn nicht auf IPython, macht Andy Haydens Antwort das ohne

Filipe Silva
quelle