Gibt es eine einfache Möglichkeit, pandas.DataFrame.isin parallel auszuführen?

25

Ich habe ein Modellierungs- und Bewertungsprogramm, das die DataFrame.isinFunktion von Pandas stark nutzt und Listen von Facebook-Like-Einträgen einzelner Benutzer für jede von mehreren tausend spezifischen Seiten durchsucht. Dies ist der zeitaufwändigste Teil des Programms, mehr als das Modellieren oder Bewerten von Stücken, einfach weil es nur auf einem Kern läuft, während der Rest auf ein paar Dutzend gleichzeitig läuft.

Obwohl ich weiß, dass ich den Datenrahmen manuell in Blöcke aufteilen und den Vorgang parallel ausführen kann, gibt es eine einfache Möglichkeit, dies automatisch zu tun? Mit anderen Worten, gibt es eine Art Paket, das erkennt, dass ich eine leicht delegierbare Operation ausführe und diese automatisch verteile? Vielleicht ist das zu viel verlangt, aber ich war in der Vergangenheit genug überrascht von dem, was bereits in Python verfügbar ist, daher denke ich, dass es sich lohnt, danach zu fragen.

Alle anderen Vorschläge, wie dies erreicht werden könnte (auch wenn nicht durch ein magisches Einhornpaket!), Wären ebenfalls willkommen. Ich versuche nur, einen Weg zu finden, um 15 bis 20 Minuten pro Lauf zu sparen, ohne die gleiche Zeit für das Codieren der Lösung aufzuwenden.

Therriault
quelle
Wie groß ist Ihre Werteliste? Haben Sie versucht, es als Set zu übergeben? Aus Gründen der Parallelität könnte Sie Joblib interessieren. Es ist einfach zu bedienen und kann Berechnungen beschleunigen. Verwenden Sie es mit großen Datenmengen.
oao
Eine andere Möglichkeit besteht darin, Ihr Problem als Join neu zu definieren. Beitritte sind viel schneller in Pandas stackoverflow.com/questions/23945493/…
Brian Spiering
Eine weitere Option ist die Verwendung von np.in1d, die auch schneller ist. Stackoverflow.com/questions/21738882/fast-pandas-filtering
Brian Spiering

Antworten:

8

Leider ist die Parallelisierung bei Pandas noch nicht implementiert. Sie können sich dieser Github-Ausgabe anschließen, wenn Sie an der Entwicklung dieser Funktion teilnehmen möchten.

Ich kenne kein "magisches Einhorn-Paket" für diese Zwecke, daher ist es das Beste, eine eigene Lösung zu schreiben. Aber wenn Sie trotzdem keine Zeit damit verbringen möchten und etwas Neues lernen möchten, können Sie die beiden in MongoDB integrierten Methoden (Map Reduce und Agg Framework) ausprobieren. Siehe mongodb_agg_framework .

Stanpol
quelle
6

Ich denke, Ihre beste Wette wäre Rosetta . Ich finde es sehr nützlich und einfach. Überprüfen Sie die Pandas-Methoden .

Sie können es per Pip bekommen .

dmvianna
quelle
Ich würde empfehlen, Rosetta zu bekommen, indem ich direkt zu GitHub gehe. Das stellt sicher, dass Sie die neueste Version erhalten. github.com/columbia-applied-data-science/rosetta
Ian Langmore
0

Es gibt eine häufigere Version dieser Frage bezüglich der Parallelisierung der Pandas Apply- Funktion - dies ist also eine erfrischende Frage :)

Zunächst möchte ich Swifter erwähnen, da Sie nach einer "gepackten" Lösung gefragt haben und dies auf den meisten SO-Fragen in Bezug auf die Parallelisierung von Pandas erscheint.

Aber ich möchte trotzdem meinen persönlichen Hauptcode dafür weitergeben, da ich nach einigen Jahren der Arbeit mit DataFrame nie eine 100% -Parallelisierungslösung (hauptsächlich für die Apply-Funktion) gefunden habe und immer wieder zurückkehren musste, um meinen " Handbuch "Code.

Dank Ihnen habe ich es allgemeiner gestaltet, jede (theoretisch) DataFrame-Methode mit ihrem Namen zu unterstützen (damit Sie keine Versionen für isin, apply usw. behalten müssen).

Ich habe es auf "isin" -, "apply" - und "isna" -Funktionen mit Python 2.7 und 3.6 getestet. Es ist unter 20 Zeilen, und ich folgte der Pandas Namenskonvention wie "Teilmenge" und "NJOBS".

Ich habe auch einen Zeitvergleich mit dem dask-äquivalenten Code für "isin" hinzugefügt und es scheint ~ X2-mal langsamer zu sein als dieser Kern.

Es beinhaltet 2 Funktionen:

df_multi_core - das ist das, was du nennst. Es akzeptiert:

  1. Dein df Objekt
  2. Der Funktionsname, den Sie aufrufen möchten
  3. Die Teilmenge der Spalten, für die die Funktion ausgeführt werden kann (hilft, Zeit / Speicher zu reduzieren)
  4. Die Anzahl der Jobs, die parallel ausgeführt werden sollen (-1 oder für alle Kerne weggelassen)
  5. Alle anderen kwargs, die die df-Funktion akzeptiert (wie "axis")

_df_split - Dies ist eine interne Hilfsfunktion , die global zum laufenden Modul positioniert werden muss (Pool.map ist "Placement-abhängig"), andernfalls würde ich sie intern lokalisieren.

Hier ist der Code aus meiner Übersicht (ich werde dort weitere Pandas-Funktionstests hinzufügen):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Der folgende Code ist ein Testcode für ein parallelisiertes isin , in dem die native Mehrkernleistung mit der Leistung von dask verglichen wird. Auf einer I7-Maschine mit 8 physischen Kernen habe ich ungefähr die vierfache Geschwindigkeit erreicht. Ich würde gerne hören, was Sie von Ihren realen Daten erhalten!

from time import time

if __name__ == '__main__': 
    sep = '-' * 50

    # isin test
    N = 10000000
    df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
    lookfor = np.random.randint(low=1, high=N, size=1000000)

    print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
    t1 = time()
    print('result\n{}'.format(df.isin(lookfor).sum()))
    t2 = time()
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
    print('result\n{}'.format(res.sum()))
    t4 = time()
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))


    t5 = time()
    ddata = dd.from_pandas(df, npartitions=njobs)
    res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
    t6 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))

--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1    953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for dask implementation 2.88
mork
quelle
@Therriault Ich habe einen Dask-Vergleich mit isin- es scheint, dass das Code-Snippet mit 'isin' am effektivsten ist - ~ X1.75-mal schneller als dask (im Vergleich zu der applyFunktion, die nur 5% schneller als dask wurde)
am