Anwenden der Python-Funktion auf Pandas gruppierten DataFrame - Was ist der effizienteste Ansatz, um die Berechnungen zu beschleunigen?

9

Ich habe es mit ziemlich großen Pandas DataFrame zu tun - mein Datensatz ähnelt einem folgenden dfSetup:

import pandas as pd
import numpy  as np

#--------------------------------------------- SIZING PARAMETERS :
R1 =                    20        # .repeat( repeats = R1 )
R2 =                    10        # .repeat( repeats = R2 )
R3 =                541680        # .repeat( repeats = [ R3, R4 ] )
R4 =                576720        # .repeat( repeats = [ R3, R4 ] )
T  =                 55920        # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used

#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
         { 'measurement_id':        np.repeat( [0, 1], repeats = [ R3, R4 ] ), 
           'time':np.concatenate( [ np.repeat( A1,     repeats = R1 ),
                                    np.repeat( A2,     repeats = R1 ) ] ), 
           'group':        np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
           'object':       np.tile( np.arange( 0, R1 ),                T )
           }
        )

#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
                  df                                                  \
                    .groupby( ['measurement_id', 'time', 'group'] )    \
                    .apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
                    .explode()                                           \
                    .astype( 'float' )                                    \
                    .to_frame( 'var' )                                     \
                    .reset_index( drop = True )
                  ], axis = 1
                )

Hinweis: Um ein minimales Beispiel zu haben, kann es leicht untergeordnet werden (zum Beispiel mit df.loc[df['time'] <= 400, :]), aber da ich die Daten trotzdem simuliere, dachte ich, dass die Originalgröße einen besseren Überblick geben würde.

Für jede von definierte Gruppe muss ['measurement_id', 'time', 'group']ich die folgende Funktion aufrufen:

from sklearn.cluster import SpectralClustering
from pandarallel     import pandarallel

def cluster( x, index ):
    if len( x ) >= 2:
        data = np.asarray( x )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.Series( clustering.labels_ + 1, index = index )
    else:
        return pd.Series( np.nan, index = index )

Um die Leistung zu verbessern, habe ich zwei Ansätze ausprobiert:

Pandarallel-Paket

Der erste Ansatz bestand darin, die Berechnungen mit dem pandarallelPaket zu parallelisieren :

pandarallel.initialize( progress_bar = True )
df \
  .groupby( ['measurement_id', 'time', 'group'] ) \
  .parallel_apply( lambda x: cluster( x['var'], x['object'] ) )

Dies scheint jedoch nicht optimal zu sein, da es viel RAM verbraucht und nicht alle Kerne für Berechnungen verwendet werden (obwohl die Anzahl der Kerne in der pandarallel.initialize()Methode explizit angegeben wurde ). Manchmal werden Berechnungen auch mit verschiedenen Fehlern beendet, obwohl ich keine Gelegenheit hatte, einen Grund dafür zu finden (möglicherweise ein Mangel an RAM?).

PySpark Pandas UDF

Ich habe auch einen Spark Pandas UDF ausprobiert, obwohl ich für Spark völlig neu bin. Hier ist mein Versuch:

import findspark;  findspark.init()

from pyspark.sql           import SparkSession
from pyspark.conf          import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types     import *

spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )

@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
    if len( df['var'] ) >= 2:
        data = np.asarray( df['var'] )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.DataFrame( clustering.labels_ + 1,
                             index = df['object']
                             )
    else:
        return pd.DataFrame( np.nan,
                             index = df['object']
                             )

res = df                                           \
        .groupBy( ['id_half', 'frame', 'team_id'] ) \
        .apply( cluster )                            \
        .toPandas()

Leider war auch die Leistung unbefriedigend, und nach dem, was ich zu diesem Thema gelesen habe, ist dies möglicherweise nur die Belastung durch die Verwendung der in Python geschriebenen UDF-Funktion und die damit verbundene Notwendigkeit, alle Python-Objekte in Spark-Objekte und zurück zu konvertieren.

Also hier sind meine Fragen:

  1. Könnte einer meiner Ansätze angepasst werden, um mögliche Engpässe zu beseitigen und die Leistung zu verbessern? (zB PySpark-Setup, Anpassen nicht optimaler Operationen usw.)
  2. Sind sie bessere Alternativen? Wie vergleichen sie sich hinsichtlich der Leistung mit den angebotenen Lösungen?
Kuba_
quelle
2
hast du dask recherchiert ?
Danila Ganchar
1
Noch nicht, aber danke für deinen Vorschlag - ich werde es
versuchen
Leider habe ich nicht mit gearbeitet dask(((mein Kommentar ist also nur ein Ratschlag für die Forschung)
Danila Ganchar
Mit Leistung meinte ich Zeit, in der Berechnungen abgeschlossen werden können.
Kuba_

Antworten:

1

F : " Könnte einer meiner Ansätze angepasst werden , um mögliche Engpässe zu beseitigen und die Leistung zu verbessern? (ZB PySpark-Setup, Anpassen nicht optimaler Vorgänge usw.) "

+1für die Erwähnung der Einrichtungs -Zusatzkosten für beide Rechenstrategien. Dies macht immer einen Break-Even-Punkt, nach dem eine Nicht- [SERIAL]Strategie eine vorteilhafte Freude an einer gewünschten [TIME]Domänen - Beschleunigung erzielen kann (wenn andere, normalerweise - [SPACE]Domänenkosten dies zulassen oder machbar bleiben - ja, RAM. .. Existenz und Zugriff auf ein Gerät dieser Größe, Budget und andere ähnliche reale Einschränkungen)

Erstens
die Überprüfung vor dem Flug vor dem Start

Die neue, Overhead-strenge Formulierung des Amdahlschen Gesetzes kann derzeit diese beiden zusätzlichen pSO + pTOOverheads berücksichtigen und diese bei der Vorhersage der erreichbaren Geschwindigkeitssteigerungen einschließlich der Gewinnschwelle widerspiegeln Punkt, ab dem es sinnvoll werden kann (im Sinne von Kosten / Effekt, Effizienz), parallel zu gehen.

Geben Sie hier die Bildbeschreibung ein

Dies
ist hier jedoch nicht unser Kernproblem .
Das kommt als nächstes:

Angesichts
der Rechenkosten von SpectralClustering(), die hier für die Verwendung des Kernels der radialen Boltzmann-Funktion verwendet ~ exp( -gamma * distance( data, data )**2 )werden, scheint es keinen Fortschritt bei der Aufteilung des dataObjekts auf eine beliebige Anzahl von disjunkten Arbeitseinheiten zu geben, da die distance( data, data )Komponente per Definition nur dazu verpflichtet ist Besuchen Sie alle dataElemente (siehe Kommunikationskosten für beliebig { process | node }verteilte Topologien, die den Wert übergeben, sind aus offensichtlichen Gründen schrecklich schlecht, wenn nicht die schlechtesten Anwendungsfälle für die { process | node }verteilte Verarbeitung, wenn nicht die direkten Anti-Muster (mit Ausnahme einiger tatsächlich arkaner, speicherloser / zustandsloser, jedoch rechnergestützter Strukturen).

Für pedantisch Analysten, ja - fügen Sie diese (und wir können schon sagen schlechten Zustand) die Kosten für die - wieder - Any-to-any k-means -Verarbeitung, hier über O( N^( 1 + 5 * 5 ) )das geht, denn N ~ len( data ) ~ 1.12E6+, furchtbar gegen unseren Wunsch einige haben intelligente und schnelle Verarbeitung.

Na und?

Während die Rüstkosten nicht außer Acht gelassen werden, die erhöhte Kommunikationskosten werden von fast sicher deaktivieren eine Verbesserung der oben skizzierten Versuche unter Verwendung von einem rein- zu bewegen [SERIAL]Prozessablauf in irgendeiner Form von nur - [CONCURRENT]oder True - [PARALLEL]Orchestrierung einiger Arbeitsuntereinheiten Aufgrund des erhöhten Overheads im Zusammenhang mit einem Muss für die Implementierung (eines Tandempaars von) Topologien, die alle Werte übergeben.

Wenn sie nicht wären?

Nun, das klingt wie ein Oxymoron der Informatik - selbst wenn es möglich wäre, die Kosten für die vorab berechneten Entfernungen (die diese immensen [TIME]Kosten für die Domänenkomplexität "im Voraus" verursachen würden (Wo? Wie? Gibt es welche?) Eine andere, nicht vermeidbare Latenz, die eine mögliche Latenzmaskierung durch einen (bisher unbekannten) inkrementellen Aufbau einer in Zukunft vollständigen Abstandsmatrix ermöglicht, würde diese hauptsächlich vorhandenen Kosten nur an einem anderen Ort in neu positionieren [TIME]- und [SPACE]-Domains, nicht reduzieren.

F : "Sind sie bessere Alternativen? "

Das einzige, ich bin so weit weg bewusst, ist zu versuchen, wenn das Problem möglich ist , neu formuliert in eine andere zu bekommen, ein QUBO formulierten Problem Mode (Ref .: Q uantum- U nconstrained- B inary- O ptimisation Eine gute Nachricht ist, dass Tools dafür, eine Basis aus Wissen aus erster Hand und praktische Erfahrung bei der Problemlösung vorhanden sind und größer werden.

F : Wie vergleichen sie sich hinsichtlich der Leistung mit den bereitgestellten Lösungen ?

Die Leistung ist atemberaubend - QUBO-formuliertes Problem hat einen vielversprechenden O(1)(!) Löser in konstanter Zeit (in [TIME]-Domain) und etwas eingeschränkt in [SPACE]-Domain (wo kürzlich angekündigte LLNL-Tricks helfen können, diese physische Welt zu vermeiden, aktuelle QPU-Implementierung, Problembeschränkung Größen).

user3666197
quelle
Dies ist eine interessante Antwort, scheint aber den Punkt zu verfehlen - OP trainiert mehrere kleine Modelle, nicht ein einziges. Ihre Kernbeobachtung ist also meistens irrelevant.
user10938362
@ user10938362 Wie wird Ihre beanspruchte Immobilie (Schulung kleiner Modelle) in eine andere als die oben angegebene Big-O-Metrik der Verarbeitungskosten übersetzt? Sicher, viele kleinere Modelle versprechen eine theoretisch nur linear wachsende Summe der (noch) großen O-Kosten der einzelnen Verarbeitung (jetzt kleiner in N, aber nicht in anderen Faktoren) , aber Sie müssen noch eine schrecklich teurere Summe hinzufügen Add-On-Kosten sowohl für die Einrichtungs- als auch für die Terminierungs-Overhead-Kosten zuzüglich aller Add-On-Kommunikations-Overhead-Kosten (Parameter / Daten / Ergebnisse + normalerweise auch Paare von SER / DES-Verarbeitungskosten in jedem Schritt)
user3666197
0

Dies ist keine Antwort, aber ...

Wenn du läufst

df.groupby(['measurement_id', 'time', 'group']).apply(
    lambda x: cluster(x['var'], x['object']))

(dh nur mit Pandas) werden Sie feststellen, dass Sie bereits mehrere Kerne verwenden. Dies liegt daran , sklearnAnwendungen joblibstandardmäßig um die Arbeit zu parallelisieren. Du kannst den Scheduler gegen Dask austauschen und möglicherweise mehr Effizienz beim Teilen der Daten zwischen Threads erzielen. Solange Ihre Arbeit jedoch so CPU-gebunden ist, können Sie nichts tun, um sie zu beschleunigen.

Kurz gesagt, dies ist ein Algorithmusproblem: Finden Sie heraus, was Sie wirklich berechnen müssen, bevor Sie versuchen, verschiedene Frameworks für die Berechnung in Betracht zu ziehen.

mdurant
quelle
Könnten Sie bitte erklären, warum Sie "... das Teilen der Daten zwischen Threads ..." erwähnen, nachdem die Arbeitsteilung durch joblibgespawnte Prozesse organisiert wurde, die nichts mit Threads zu tun haben, umso weniger mit dem Teilen? Vielen Dank für Ihre freundliche Klärung der Argumente.
user3666197
Genau genommen verwendet jboblib normalerweise Prozesse, aber es kann alternativ dask als Backend verwenden, wo Sie Ihre Mischung aus Threads und Prozessen auswählen können.
mdurant
Ich bin ein Neuling im Parallel-Computing, aber selbst wenn sklearn Parallelisierung verwendet, ist es in diesen Einstellungen nicht nutzlos? Ich meine, Operationen, die von sklearn ausgeführt werden, sind extrem einfach, da jede Clusteroperation nur auf 10 Punkte angewendet wird. Auch hier könnte ich mich irren, aber ich denke, die Art und Weise, wie wir die Verarbeitung von Blöcken von Originaldaten parallelisieren, ist das eigentliche Problem.
Kuba_
"
Ist
0

Ich bin kein Experte für Dask, aber ich biete den folgenden Code als Basis:

import dask.dataframe as ddf

df = ddf.from_pandas(df, npartitions=4) # My PC has 4 cores

task = df.groupby(["measurement_id", "time", "group"]).apply(
    lambda x: cluster(x["var"], x["object"]),
    meta=pd.Series(np.nan, index=pd.Series([0, 1, 1, 1])),
)

res = task.compute()
durchgeknallt
quelle