Ich habe es mit ziemlich großen Pandas DataFrame zu tun - mein Datensatz ähnelt einem folgenden df
Setup:
import pandas as pd
import numpy as np
#--------------------------------------------- SIZING PARAMETERS :
R1 = 20 # .repeat( repeats = R1 )
R2 = 10 # .repeat( repeats = R2 )
R3 = 541680 # .repeat( repeats = [ R3, R4 ] )
R4 = 576720 # .repeat( repeats = [ R3, R4 ] )
T = 55920 # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used
#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
{ 'measurement_id': np.repeat( [0, 1], repeats = [ R3, R4 ] ),
'time':np.concatenate( [ np.repeat( A1, repeats = R1 ),
np.repeat( A2, repeats = R1 ) ] ),
'group': np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
'object': np.tile( np.arange( 0, R1 ), T )
}
)
#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
.explode() \
.astype( 'float' ) \
.to_frame( 'var' ) \
.reset_index( drop = True )
], axis = 1
)
Hinweis: Um ein minimales Beispiel zu haben, kann es leicht untergeordnet werden (zum Beispiel mit df.loc[df['time'] <= 400, :]
), aber da ich die Daten trotzdem simuliere, dachte ich, dass die Originalgröße einen besseren Überblick geben würde.
Für jede von definierte Gruppe muss ['measurement_id', 'time', 'group']
ich die folgende Funktion aufrufen:
from sklearn.cluster import SpectralClustering
from pandarallel import pandarallel
def cluster( x, index ):
if len( x ) >= 2:
data = np.asarray( x )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.Series( clustering.labels_ + 1, index = index )
else:
return pd.Series( np.nan, index = index )
Um die Leistung zu verbessern, habe ich zwei Ansätze ausprobiert:
Pandarallel-Paket
Der erste Ansatz bestand darin, die Berechnungen mit dem pandarallel
Paket zu parallelisieren :
pandarallel.initialize( progress_bar = True )
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.parallel_apply( lambda x: cluster( x['var'], x['object'] ) )
Dies scheint jedoch nicht optimal zu sein, da es viel RAM verbraucht und nicht alle Kerne für Berechnungen verwendet werden (obwohl die Anzahl der Kerne in der pandarallel.initialize()
Methode explizit angegeben wurde ). Manchmal werden Berechnungen auch mit verschiedenen Fehlern beendet, obwohl ich keine Gelegenheit hatte, einen Grund dafür zu finden (möglicherweise ein Mangel an RAM?).
PySpark Pandas UDF
Ich habe auch einen Spark Pandas UDF ausprobiert, obwohl ich für Spark völlig neu bin. Hier ist mein Versuch:
import findspark; findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )
@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
if len( df['var'] ) >= 2:
data = np.asarray( df['var'] )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.DataFrame( clustering.labels_ + 1,
index = df['object']
)
else:
return pd.DataFrame( np.nan,
index = df['object']
)
res = df \
.groupBy( ['id_half', 'frame', 'team_id'] ) \
.apply( cluster ) \
.toPandas()
Leider war auch die Leistung unbefriedigend, und nach dem, was ich zu diesem Thema gelesen habe, ist dies möglicherweise nur die Belastung durch die Verwendung der in Python geschriebenen UDF-Funktion und die damit verbundene Notwendigkeit, alle Python-Objekte in Spark-Objekte und zurück zu konvertieren.
Also hier sind meine Fragen:
- Könnte einer meiner Ansätze angepasst werden, um mögliche Engpässe zu beseitigen und die Leistung zu verbessern? (zB PySpark-Setup, Anpassen nicht optimaler Operationen usw.)
- Sind sie bessere Alternativen? Wie vergleichen sie sich hinsichtlich der Leistung mit den angebotenen Lösungen?
dask
(((mein Kommentar ist also nur ein Ratschlag für die Forschung)Antworten:
+1
für die Erwähnung der Einrichtungs -Zusatzkosten für beide Rechenstrategien. Dies macht immer einen Break-Even-Punkt, nach dem eine Nicht-[SERIAL]
Strategie eine vorteilhafte Freude an einer gewünschten[TIME]
Domänen - Beschleunigung erzielen kann (wenn andere, normalerweise -[SPACE]
Domänenkosten dies zulassen oder machbar bleiben - ja, RAM. .. Existenz und Zugriff auf ein Gerät dieser Größe, Budget und andere ähnliche reale Einschränkungen)Erstens
die Überprüfung vor dem Flug vor dem Start
Die neue, Overhead-strenge Formulierung des Amdahlschen Gesetzes kann derzeit diese beiden zusätzlichen
pSO + pTO
Overheads berücksichtigen und diese bei der Vorhersage der erreichbaren Geschwindigkeitssteigerungen einschließlich der Gewinnschwelle widerspiegeln Punkt, ab dem es sinnvoll werden kann (im Sinne von Kosten / Effekt, Effizienz), parallel zu gehen.Dies
ist hier jedoch nicht unser Kernproblem .
Das kommt als nächstes:
Angesichts
der Rechenkosten von
SpectralClustering()
, die hier für die Verwendung des Kernels der radialen Boltzmann-Funktion verwendet~ exp( -gamma * distance( data, data )**2 )
werden, scheint es keinen Fortschritt bei der Aufteilung desdata
Objekts auf eine beliebige Anzahl von disjunkten Arbeitseinheiten zu geben, da diedistance( data, data )
Komponente per Definition nur dazu verpflichtet ist Besuchen Sie alledata
Elemente (siehe Kommunikationskosten für beliebig{ process | node }
verteilte Topologien, die den Wert übergeben, sind aus offensichtlichen Gründen schrecklich schlecht, wenn nicht die schlechtesten Anwendungsfälle für die{ process | node }
verteilte Verarbeitung, wenn nicht die direkten Anti-Muster (mit Ausnahme einiger tatsächlich arkaner, speicherloser / zustandsloser, jedoch rechnergestützter Strukturen).Für pedantisch Analysten, ja - fügen Sie diese (und wir können schon sagen schlechten Zustand) die Kosten für die - wieder - Any-to-any k-means -Verarbeitung, hier über
O( N^( 1 + 5 * 5 ) )
das geht, dennN ~ len( data ) ~ 1.12E6+
, furchtbar gegen unseren Wunsch einige haben intelligente und schnelle Verarbeitung.Na und?
Während die Rüstkosten nicht außer Acht gelassen werden, die erhöhte Kommunikationskosten werden von fast sicher deaktivieren eine Verbesserung der oben skizzierten Versuche unter Verwendung von einem rein- zu bewegen
[SERIAL]
Prozessablauf in irgendeiner Form von nur -[CONCURRENT]
oder True -[PARALLEL]
Orchestrierung einiger Arbeitsuntereinheiten Aufgrund des erhöhten Overheads im Zusammenhang mit einem Muss für die Implementierung (eines Tandempaars von) Topologien, die alle Werte übergeben.Wenn sie nicht wären?
Nun, das klingt wie ein Oxymoron der Informatik - selbst wenn es möglich wäre, die Kosten für die vorab berechneten Entfernungen (die diese immensen
[TIME]
Kosten für die Domänenkomplexität "im Voraus" verursachen würden (Wo? Wie? Gibt es welche?) Eine andere, nicht vermeidbare Latenz, die eine mögliche Latenzmaskierung durch einen (bisher unbekannten) inkrementellen Aufbau einer in Zukunft vollständigen Abstandsmatrix ermöglicht, würde diese hauptsächlich vorhandenen Kosten nur an einem anderen Ort in neu positionieren[TIME]
- und[SPACE]
-Domains, nicht reduzieren.Das einzige, ich bin so weit weg bewusst, ist zu versuchen, wenn das Problem möglich ist , neu formuliert in eine andere zu bekommen, ein QUBO formulierten Problem Mode (Ref .: Q uantum- U nconstrained- B inary- O ptimisation Eine gute Nachricht ist, dass Tools dafür, eine Basis aus Wissen aus erster Hand und praktische Erfahrung bei der Problemlösung vorhanden sind und größer werden.
Die Leistung ist atemberaubend - QUBO-formuliertes Problem hat einen vielversprechenden
O(1)
(!) Löser in konstanter Zeit (in[TIME]
-Domain) und etwas eingeschränkt in[SPACE]
-Domain (wo kürzlich angekündigte LLNL-Tricks helfen können, diese physische Welt zu vermeiden, aktuelle QPU-Implementierung, Problembeschränkung Größen).quelle
Dies ist keine Antwort, aber ...
Wenn du läufst
(dh nur mit Pandas) werden Sie feststellen, dass Sie bereits mehrere Kerne verwenden. Dies liegt daran ,
sklearn
Anwendungenjoblib
standardmäßig um die Arbeit zu parallelisieren. Du kannst den Scheduler gegen Dask austauschen und möglicherweise mehr Effizienz beim Teilen der Daten zwischen Threads erzielen. Solange Ihre Arbeit jedoch so CPU-gebunden ist, können Sie nichts tun, um sie zu beschleunigen.Kurz gesagt, dies ist ein Algorithmusproblem: Finden Sie heraus, was Sie wirklich berechnen müssen, bevor Sie versuchen, verschiedene Frameworks für die Berechnung in Betracht zu ziehen.
quelle
joblib
gespawnte Prozesse organisiert wurde, die nichts mit Threads zu tun haben, umso weniger mit dem Teilen? Vielen Dank für Ihre freundliche Klärung der Argumente.Ich bin kein Experte für
Dask
, aber ich biete den folgenden Code als Basis:quelle