Effizientes paralleles Anwenden einer Funktion auf einen gruppierten Pandas DataFrame

88

Ich muss oft eine Funktion auf die Gruppen eines sehr großen DataFrame(gemischten Datentyps) anwenden und möchte mehrere Kerne nutzen.

Ich kann einen Iterator aus den Gruppen erstellen und das Multiprozessor-Modul verwenden, aber es ist nicht effizient, da jede Gruppe und die Ergebnisse der Funktion für die Nachrichtenübermittlung zwischen Prozessen ausgewählt werden müssen.

Gibt es eine Möglichkeit, das Beizen oder sogar das DataFramevollständige Kopieren zu vermeiden ? Es sieht so aus, als ob die Shared-Memory-Funktionen der Multiprocessing-Module auf numpyArrays beschränkt sind . Gibt es noch andere Möglichkeiten?

user2303
quelle
Soweit ich weiß, gibt es keine Möglichkeit, beliebige Objekte zu teilen. Ich frage mich, ob das Beizen so viel länger dauert als der Gewinn durch Mehrfachverarbeitung. Vielleicht sollten Sie nach einer Möglichkeit suchen, größere Arbeitspakete für jeden Prozess zu erstellen, um die relative Beizzeit zu verkürzen. Eine andere Möglichkeit wäre die Verwendung von Multiprocessing, wenn Sie die Gruppen erstellen.
Sebastian Werk
3
Ich mache so etwas, aber benutze UWSGI, Flask und Preforking: Ich lade den Pandas-Datenrahmen in einen Prozess, verzweige ihn x-mal (mache ihn zu einem Shared-Memory-Objekt) und rufe diese Prozesse dann von einem anderen Python-Prozess aus auf, in dem ich die Ergebnisse konzentriere. atm Ich benutze JSON als Kommunikationsprozess, aber dies kommt (noch sehr experimentell): pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental
Carst
Haben Sie sich HDF5 schon einmal mit Chunking angesehen? (HDF5 ist nicht für gleichzeitiges Schreiben gespeichert, aber Sie können auch speichern, um Dateien zu trennen und am Ende Sachen zu verketten)
Carst
7
Dies wird für 0.14 angestrebt, siehe diese Ausgabe: github.com/pydata/pandas/issues/5751
Jeff
4
@ Jeff wurde auf 0,15 = (
pyCthon

Antworten:

12

Aus den obigen Kommentaren geht hervor, dass dies für pandaseinige Zeit geplant ist (es gibt auch ein interessant aussehendes rosettaProjekt, das mir gerade aufgefallen ist).

Bis jedoch alle parallelen Funktionen integriert sind pandas, ist mir aufgefallen, dass es sehr einfach ist, effiziente und nicht speicherkopierende parallele Erweiterungen pandasdirekt mit cython+ OpenMP und C ++ zu schreiben .

Hier ist ein kurzes Beispiel für das Schreiben einer parallelen Gruppensumme, deren Verwendung ungefähr so ​​ist:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

und Ausgabe ist:

     sum
key     
0      6
1      11
2      4

Hinweis Zweifellos wird die Funktionalität dieses einfachen Beispiels irgendwann Teil davon sein pandas. Einige Dinge sind jedoch für einige Zeit natürlicher in C ++ zu parallelisieren, und es ist wichtig zu wissen, wie einfach es ist, dies zu kombinieren pandas.


Zu diesem Zweck habe ich eine einfache Dateierweiterung aus einer Quelle geschrieben, deren Code folgt.

Es beginnt mit einigen Importen und Typdefinitionen

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

Der C ++ - unordered_mapTyp dient zum Summieren durch einen einzelnen Thread und der vectorzum Summieren durch alle Threads.

Nun zur Funktion sum. Es beginnt mit getippten Speicheransichten für den schnellen Zugriff:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

Die Funktion wird fortgesetzt, indem das Semi-Equal auf die Threads aufgeteilt wird (hier fest auf 4 codiert) und jeder Thread die Einträge in seinem Bereich summiert:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

Wenn die Threads abgeschlossen sind, führt die Funktion alle Ergebnisse (aus den verschiedenen Bereichen) zu einem einzigen zusammen unordered_map:

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

Sie müssen nur noch ein erstellen DataFrameund die Ergebnisse zurückgeben:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df
Ami Tavory
quelle