Pandas DataFrame anwenden lassen () alle Kerne verwenden?

95

Ab August 2017 ist Pandas DataFame.apply () leider immer noch auf die Arbeit mit einem einzelnen Kern beschränkt, was bedeutet, dass ein Multi-Core-Computer den größten Teil seiner Rechenzeit beim Ausführen verschwendet df.apply(myfunc, axis=1).

Wie können Sie alle Ihre Kerne verwenden, um die Anwendung auf einem Datenrahmen parallel auszuführen?

Roko Mijic
quelle

Antworten:

68

Sie können das swifterPaket verwenden:

pip install swifter

Es funktioniert als Plugin für Pandas und ermöglicht es Ihnen, die applyFunktion wiederzuverwenden :

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Es wird automatisch herausgefunden, wie die Funktion am effizientesten parallelisiert werden kann, unabhängig davon, ob sie vektorisiert ist (wie im obigen Beispiel) oder nicht.

Weitere Beispiele und einen Leistungsvergleich finden Sie auf GitHub. Beachten Sie, dass sich das Paket in der aktiven Entwicklung befindet, sodass sich die API möglicherweise ändern kann.

Beachten Sie auch, dass dies für Zeichenfolgenspalten nicht automatisch funktioniert . Bei Verwendung von Strings greift Swifter auf einen „einfachen“ Pandas zurück apply, der nicht parallel ist. In diesem Fall führt das Erzwingen der Verwendung dasknicht zu Leistungsverbesserungen, und Sie sollten Ihr Dataset besser manuell aufteilen und mithilfe von parallelisierenmultiprocessing .

slhck
quelle
1
Gibt es aus purer Neugier eine Möglichkeit, die Anzahl der Kerne zu begrenzen, die bei der parallelen Anwendung verwendet werden? Ich habe einen gemeinsam genutzten Server. Wenn ich also alle 32 Kerne ergreife, ist niemand glücklich.
Maksim Khaitovich
1
@ MaximHaytovich Ich weiß es nicht. Swifter verwendet dask im Hintergrund, daher werden möglicherweise die folgenden Einstellungen berücksichtigt : stackoverflow.com/a/40633117/435093 - andernfalls würde ich empfehlen, ein Problem auf GitHub zu öffnen. Der Autor ist sehr reaktionsschnell.
Slhck
@slhck danke! Werde es ein bisschen mehr graben. Es scheint sowieso nicht auf Windows Server zu funktionieren - hängt nur daran, nichts an Spielzeugaufgaben zu tun
Maksim Khaitovich
Können
ak3191
2
allow_dask_on_strings(enable=True)df.swifter.allow_dask_on_strings(enable=True).apply(some_function)
Fügen Sie
96

Am einfachsten ist es, die map_partitions von Dask zu verwenden . Sie benötigen diese Importe (Sie müssen pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

und die Syntax ist

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(Ich glaube, dass 30 eine geeignete Anzahl von Partitionen ist, wenn Sie 16 Kerne haben). Der Vollständigkeit halber habe ich den Unterschied auf meiner Maschine (16 Kerne) zeitlich festgelegt:

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0,010668013244867325

Wenn Sie einen Faktor von 10 für die Beschleunigung von Pandas angeben , gilt dies für dask auf Partitionen. Wenn Sie eine Funktion haben, die Sie vektorisieren können, sollten Sie dies natürlich tun - in diesem Fall ist die Funktion ( y*(x**2+1)) trivial vektorisiert, aber es gibt viele Dinge, die nicht vektorisiert werden können.

Roko Mijic
quelle
2
Schön zu wissen, danke fürs posten. Können Sie erklären, warum Sie 30 Partitionen ausgewählt haben? Ändert sich die Leistung, wenn dieser Wert geändert wird?
Andrew L
2
@AndrewL Ich gehe davon aus, dass jede Partition von einem separaten Prozess bedient wird, und mit 16 Kernen gehe ich davon aus, dass entweder 16 oder 32 Prozesse gleichzeitig ausgeführt werden können. Ich habe es ausprobiert und die Leistung scheint sich auf bis zu 32 Partitionen zu verbessern, aber weitere Erhöhungen haben keine vorteilhaften Auswirkungen. Ich gehe davon aus, dass Sie mit einer Quad-Core-Maschine 8 Partitionen usw. benötigen würden. Beachten Sie, dass ich eine Verbesserung zwischen 16 und 32 festgestellt habe. Ich denke, Sie möchten wirklich 2x $ NUM_PROCESSORS
Roko Mijic
8
Das einzige istThe get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'
Worte für den
4
Verwenden Sie für dask v0.20.0 und höher ddata.map_partitions (Lambda df: df.apply ((Lambda-Zeile: myfunc (* Zeile)), Achse = 1)). Compute (Scheduler = 'Prozesse') oder eine der andere Scheduler-Optionen. Der aktuelle Code löst "TypeError: Das Schlüsselwort get = wurde entfernt. Bitte verwenden Sie stattdessen das Schlüsselwort scheduler = mit dem Namen des gewünschten Schedulers wie 'threads' oder '
process
1
Stellen Sie sicher, dass der Datenrahmen beim Auslösen keine doppelten Indizes enthält ValueError: cannot reindex from a duplicate axis. Um dies zu umgehen, sollten Sie entweder doppelte Indizes von entfernen df = df[~df.index.duplicated()]oder Ihre Indizes von zurücksetzen df.reset_index(inplace=True).
Habib Karbasian
23

Sie können pandarallelstattdessen Folgendes versuchen : Ein einfaches und effizientes Tool zum Parallelisieren Ihrer Pandas-Vorgänge auf allen Ihren CPUs (unter Linux und MacOS)

  • Die Parallelisierung ist mit Kosten verbunden (neue Prozesse initiieren, Daten über den gemeinsam genutzten Speicher senden usw.). Daher ist die Parallelisierung nur dann wirksam, wenn der Rechenaufwand für die Parallelisierung hoch genug ist. Bei sehr geringen Datenmengen lohnt sich die Verwendung der Parallezierung nicht immer.
  • Die angewendeten Funktionen sollten KEINE Lambda-Funktionen sein.
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

Siehe https://github.com/nalepae/pandarallel

G_KOBELIEF
quelle
Hallo, ich kann ein Problem nicht lösen. Bei Verwendung von Pandarallel gibt es einen Fehler: AttributeError: Das lokale Objekt 'prepare_worker. <locals> .closure. <locals> .wrapper' kann nicht ausgewählt werden. Kannst du mir dabei helfen?
Alex Cam
@ Alex Sry Ich bin nicht der Entwickler dieses Moduls. Wie sehen Ihre Codes aus? Sie können versuchen, Ihre "Insider-Funktionen" als global zu deklarieren? (nur raten)
G_KOBELIEF
@AlexCam Ihre Funktion sollte außerhalb einer anderen Funktion definiert werden, damit Python sie für die Mehrfachverarbeitung auswählen kann
Kenan,
@G_KOBELIEF Mit Python> 3.6 können wir die Lambda-Funktion mit pandaparallel verwenden
user110244
13

Wenn Sie in nativem Python bleiben möchten:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

wendet die Funktion fparallel auf die Spalte coldes Datenrahmens andf

Olivier Cruchant
quelle
Nach einem solchen Ansatz bekam ich einen ValueError: Length of values does not match length of indexvon __setitem__in pandas/core/frame.py. Ich bin mir nicht sicher, ob ich etwas falsch gemacht habe oder ob das Zuweisen zu df['newcol']nicht threadsicher ist.
Rassel
2
Sie können die pool.map in eine Zwischenliste temp_result schreiben, um zu überprüfen, ob die Länge mit dem df übereinstimmt, und dann ein df ['newcol'] = temp_result?
Olivier Cruchant
Du meinst die neue Spalte erstellen? was würdest du benutzen
Olivier Cruchant
Ja, das Ergebnis der Karte wird der neuen Spalte des Datenrahmens zugewiesen. Gibt map nicht eine Liste des Ergebnisses jedes an die Funktion f gesendeten Blocks zurück? Was passiert also, wenn Sie das der Spalte 'newcol' zuweisen? Verwenden von Pandas und Python 3
Mina
Es funktioniert tatsächlich sehr reibungslos! Hast Du es versucht? Es wird eine Liste mit der gleichen Länge des df erstellt, in der gleichen Reihenfolge wie das, was gesendet wurde. Es macht buchstäblich c2 = f (c1) parallel. Es gibt keinen einfacheren Weg, um in Python mehrere Prozesse durchzuführen. In Bezug auf die Leistung scheint es, dass Ray auch gute Dinge tun kann (in Richtung datascience.com/… ), aber es ist nicht so ausgereift und die Installation läuft meiner Erfahrung nach nicht immer reibungslos
Olivier Cruchant
1

Hier ist ein Beispiel eines sklearn-Basistransformators, bei dem Pandas parallelisiert sind

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

Weitere Informationen finden Sie unter https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

Maxim Balatsko
quelle