Ich bevorzuge Python gegenüber Scala. Da Spark nativ in Scala geschrieben ist, hatte ich aus offensichtlichen Gründen erwartet, dass mein Code in der Scala schneller ausgeführt wird als in der Python-Version.
Mit dieser Annahme dachte ich, ich würde die Scala-Version eines sehr gängigen Vorverarbeitungscodes für etwa 1 GB Daten lernen und schreiben. Die Daten stammen aus dem SpringLeaf-Wettbewerb auf Kaggle . Nur um einen Überblick über die Daten zu geben (es enthält 1936 Dimensionen und 145232 Zeilen). Daten bestehen aus verschiedenen Typen, z. B. int, float, string, boolean. Ich verwende 6 von 8 Kernen für die Spark-Verarbeitung. Deshalb habe ich verwendet, minPartitions=6
damit jeder Kern etwas zu verarbeiten hat.
Scala Code
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
Python-Code
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
Scala Performance Stufe 0 (38 Minuten), Stufe 1 (18 Sek.)
Python Performance Stufe 0 (11 Minuten), Stufe 1 (7 Sek.)
Beide erzeugen unterschiedliche DAG-Visualisierungsgraphen (aufgrund derer beide Bilder unterschiedliche Funktionen der Stufe 0 für Scala ( map
) und Python ( reduceByKey
) zeigen).
Im Wesentlichen versuchen beide Codes jedoch, Daten in RDD (dimension_id, Zeichenfolgenliste) umzuwandeln und auf der Festplatte zu speichern. Die Ausgabe wird verwendet, um verschiedene Statistiken für jede Dimension zu berechnen.
In Bezug auf die Leistung scheint der Scala-Code für diese realen Daten viermal langsamer zu laufen als die Python-Version. Eine gute Nachricht für mich ist, dass es mich gut motiviert hat, bei Python zu bleiben. Schlechte Nachrichten sind, dass ich nicht ganz verstanden habe, warum?
quelle
Antworten:
Die ursprüngliche Antwort zum Code finden Sie unten.
Zunächst müssen Sie zwischen verschiedenen API-Typen mit jeweils eigenen Leistungsaspekten unterscheiden.
RDD-API
(reine Python-Strukturen mit JVM-basierter Orchestrierung)
Dies ist die Komponente, die am stärksten von der Leistung des Python-Codes und den Details der PySpark-Implementierung betroffen ist. Während es unwahrscheinlich ist, dass die Python-Leistung ein Problem darstellt, müssen Sie zumindest einige Faktoren berücksichtigen:
Prozessbasierte Executoren (Python) versus threadbasierte Executoren (einzelne JVM-Threads mit mehreren Threads) (Scala). Jeder Python-Executor wird in einem eigenen Prozess ausgeführt. Als Nebeneffekt bietet es eine stärkere Isolation als sein JVM-Gegenstück und eine gewisse Kontrolle über den Executor-Lebenszyklus, jedoch möglicherweise eine erheblich höhere Speichernutzung:
Leistung des Python-Codes selbst. Im Allgemeinen ist Scala schneller als Python, variiert jedoch von Aufgabe zu Aufgabe. Darüber hinaus haben Sie mehrere Optionen, einschließlich JITs wie Numba , C-Erweiterungen ( Cython ) oder Spezialbibliotheken wie Theano . Schließlich ,
wenn Sie verwenden ML / MLlib (oder einfach NumPy Stack) nicht, prüfen , mit PyPy als Alternative Dolmetscher. Siehe SPARK-3094 .spark.python.worker.reuse
Option, mit der Sie zwischen dem Verzweigen des Python-Prozesses für jede Aufgabe und der Wiederverwendung eines vorhandenen Prozesses wählen können. Die letztere Option scheint nützlich zu sein, um eine teure Speicherbereinigung zu vermeiden (dies ist eher ein Eindruck als ein Ergebnis systematischer Tests), während die erstere (Standard) für teure Sendungen und Importe optimal ist.MLlib
(gemischte Python- und JVM-Ausführung)
Grundlegende Überlegungen sind mit einigen zusätzlichen Problemen ziemlich identisch. Während mit MLlib verwendete Grundstrukturen einfache Python-RDD-Objekte sind, werden alle Algorithmen direkt mit Scala ausgeführt.
Dies bedeutet zusätzliche Kosten für die Konvertierung von Python-Objekten in Scala-Objekte und umgekehrt, eine erhöhte Speichernutzung und einige zusätzliche Einschränkungen, die wir später behandeln werden.
Ab sofort (Spark 2.x) befindet sich die RDD-basierte API in einem Wartungsmodus und soll in Spark 3.0 entfernt werden .
DataFrame API und Spark ML
(JVM-Ausführung mit auf den Treiber beschränktem Python-Code)
Dies ist wahrscheinlich die beste Wahl für Standarddatenverarbeitungsaufgaben. Da Python-Code hauptsächlich auf logische Operationen auf hoher Ebene des Treibers beschränkt ist, sollte es keinen Leistungsunterschied zwischen Python und Scala geben.
Eine einzige Ausnahme ist die Verwendung zeilenweiser Python-UDFs, die erheblich weniger effizient sind als ihre Scala-Entsprechungen. Zwar gibt es einige Verbesserungsmöglichkeiten (Spark 2.0.0 wurde erheblich weiterentwickelt), die größte Einschränkung besteht jedoch in der vollständigen Hin- und Rückfahrt zwischen der internen Darstellung (JVM) und dem Python-Interpreter. Wenn möglich, sollten Sie eine Komposition integrierter Ausdrücke bevorzugen ( Beispiel : Das Verhalten von Python UDF wurde in Spark 2.0.0 verbessert, ist jedoch im Vergleich zur nativen Ausführung immer noch nicht optimal.
Dies
könnte sich in Zukunftverbessern, da die vektorisierten UDFs (SPARK-21190 und weitere Erweiterungen) eingeführt wurden , die Arrow Streaming für einen effizienten Datenaustausch mit Null-Kopien-Deserialisierung verwenden. Bei den meisten Anwendungen können die sekundären Gemeinkosten einfach ignoriert werden.Vermeiden Sie außerdem unnötige Datenübertragungen zwischen
DataFrames
undRDDs
. Dies erfordert eine teure Serialisierung und Deserialisierung, ganz zu schweigen von der Datenübertragung zum und vom Python-Interpreter.Es ist erwähnenswert, dass Py4J-Anrufe eine ziemlich hohe Latenz haben. Dies beinhaltet einfache Anrufe wie:
Normalerweise sollte es keine Rolle spielen (der Overhead ist konstant und hängt nicht von der Datenmenge ab), aber bei weichen Echtzeitanwendungen können Sie das Zwischenspeichern / Wiederverwenden von Java-Wrappern in Betracht ziehen.
GraphX- und Spark-DataSets
Derzeit (Spark
GraphX1.62.1) bietet keiner die PySpark-API, sodass Sie sagen können, dass PySpark unendlich schlechter ist als Scala.In der Praxis wurde die GraphX-Entwicklung fast vollständig gestoppt und das Projekt befindet sich derzeit im Wartungsmodus. Die zugehörigen JIRA-Tickets wurden geschlossen, da dies nicht behoben werden kann . Die GraphFrames- Bibliothek bietet eine alternative Grafikverarbeitungsbibliothek mit Python-Bindungen.
DatensatzSubjektiv gesehen gibt es
Datasets
in Python nicht viel Platz für statische Eingaben, und selbst wenn es die aktuelle Scala-Implementierung gab, ist sie zu simpel und bietet nicht die gleichen Leistungsvorteile wieDataFrame
.Streaming
Nach dem, was ich bisher gesehen habe, würde ich dringend empfehlen, Scala über Python zu verwenden. Es kann sich in Zukunft ändern, wenn PySpark Unterstützung für strukturierte Streams erhält, aber derzeit scheint die Scala-API viel robuster, umfassender und effizienter zu sein. Meine Erfahrung ist ziemlich begrenzt.
Strukturiertes Streaming in Spark 2.x scheint die Lücke zwischen den Sprachen zu verringern, befindet sich jedoch noch in den Anfängen. Trotzdem wird die RDD-basierte API bereits in der Databricks-Dokumentation (Datum des Zugriffs 2017-03-03) als "Legacy-Streaming" bezeichnet, sodass mit weiteren Vereinigungsbemühungen zu rechnen ist.
Überlegungen zur Nichterfüllung
Feature-ParitätNicht alle Spark-Funktionen werden über die PySpark-API verfügbar gemacht. Überprüfen Sie unbedingt, ob die benötigten Teile bereits implementiert sind, und versuchen Sie, mögliche Einschränkungen zu verstehen.
Dies ist besonders wichtig, wenn Sie MLlib und ähnliche gemischte Kontexte verwenden (siehe Aufrufen der Java / Scala-Funktion von einer Aufgabe aus ). Um fair zu sein
API-Designmllib.linalg
, bieten einige Teile der PySpark-API eine umfassendere Reihe von Methoden als Scala.Die PySpark-API spiegelt genau das Scala-Gegenstück wider und ist als solche nicht genau Pythonic. Dies bedeutet, dass es ziemlich einfach ist, zwischen Sprachen zuzuordnen, aber gleichzeitig kann es erheblich schwieriger sein, Python-Code zu verstehen.
Komplexe ArchitekturDer PySpark-Datenfluss ist im Vergleich zur reinen JVM-Ausführung relativ komplex. Es ist viel schwieriger, über PySpark-Programme oder Debugging nachzudenken. Darüber hinaus ist zumindest ein grundlegendes Verständnis von Scala und JVM im Allgemeinen ein Muss.
Spark 2.x und darüber hinausDie fortlaufende Umstellung auf
Dataset
API mit eingefrorener RDD-API bietet Python-Benutzern sowohl Chancen als auch Herausforderungen. Während übergeordnete Teile der API in Python viel einfacher verfügbar zu machen sind, können die erweiterten Funktionen kaum direkt verwendet werden .Darüber hinaus sind native Python-Funktionen weiterhin Bürger zweiter Klasse in der SQL-Welt. Hoffentlich wird sich dies in Zukunft durch die Apache Arrow-Serialisierung verbessern ( aktuelle Bemühungen zielen auf Daten ab,
collection
aber UDF-Serde ist ein langfristiges Ziel ).Für Projekte, die stark von der Python-Codebasis abhängen, könnten reine Python-Alternativen (wie Dask oder Ray ) eine interessante Alternative sein.
Es muss nicht eins gegen das andere sein
Die Spark DataFrame-API (SQL, Dataset) bietet eine elegante Möglichkeit, Scala / Java-Code in die PySpark-Anwendung zu integrieren. Sie können
DataFrames
Daten einem nativen JVM-Code aussetzen und die Ergebnisse zurücklesen. Ich habe einige Optionen an einer anderen Stelle erläutert. Ein funktionierendes Beispiel für eine Python-Scala-Rundreise finden Sie unter Verwenden einer Scala-Klasse in Pyspark .Es kann durch die Einführung benutzerdefinierter Typen weiter erweitert werden (siehe Definieren des Schemas für benutzerdefinierte Typen in Spark SQL? ).
Was ist falsch an dem in der Frage angegebenen Code?
(Haftungsausschluss: Pythonista-Standpunkt. Höchstwahrscheinlich habe ich einige Scala-Tricks verpasst)
Zuallererst gibt es einen Teil in Ihrem Code, der überhaupt keinen Sinn ergibt. Wenn Sie bereits
(key, value)
Paare mit erstellt habenzipWithIndex
oderenumerate
was bringt es, einen String zu erstellen, um ihn direkt danach zu teilen?flatMap
funktioniert nicht rekursiv, so dass Sie einfach Tupel ausgeben und das Folgen überspringen könnenmap
.Ein anderer Teil, den ich problematisch finde, ist
reduceByKey
. Im AllgemeinenreduceByKey
ist dies nützlich, wenn durch Anwenden der Aggregatfunktion die Datenmenge reduziert werden muss, die gemischt werden muss. Da Sie Zeichenfolgen einfach verketten, gibt es hier nichts zu gewinnen. Wenn Sie Dinge auf niedriger Ebene wie die Anzahl der Referenzen ignorieren, ist die Datenmenge, die Sie übertragen müssen, genau die gleiche wie fürgroupByKey
.Normalerweise würde ich nicht weiter darauf eingehen, aber soweit ich das beurteilen kann, handelt es sich um einen Engpass in Ihrem Scala-Code. Das Verbinden von Strings in JVM ist eine ziemlich teure Operation (siehe zum Beispiel: Ist die Verkettung von Strings in Scala genauso kostspielig wie in Java? ). Dies bedeutet, dass so etwas,
_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
dasinput4.reduceByKey(valsConcat)
Ihrem Code entspricht, keine gute Idee ist.Wenn Sie vermeiden möchten,
groupByKey
können Sie versuchen,aggregateByKey
mit zu verwendenStringBuilder
. Ähnliches sollte den Trick machen:aber ich bezweifle, dass es die ganze Aufregung wert ist.
Unter Berücksichtigung der obigen Punkte habe ich Ihren Code wie folgt umgeschrieben:
Scala :
Python :
Ergebnisse
Im
local[6]
Modus (Intel (R) Xeon (R) CPU E3-1245 V2 bei 3,40 GHz) mit 4 GB Speicher pro Executor (n = 3):Ich bin mir ziemlich sicher, dass die meiste Zeit für das Mischen, Serialisieren, Deserialisieren und andere sekundäre Aufgaben aufgewendet wird. Nur zum Spaß, hier ist naiver Single-Threaded-Code in Python, der dieselbe Aufgabe auf diesem Computer in weniger als einer Minute ausführt:
quelle
Erweiterung auf obige Antworten -
Scala erweist sich in vielerlei Hinsicht als schneller als Python, aber es gibt einige triftige Gründe, warum Python immer beliebter wird als Scala, lassen Sie uns einige davon sehen -
Python für Apache Spark ist ziemlich einfach zu erlernen und zu verwenden. Dies ist jedoch nicht der einzige Grund, warum Pyspark eine bessere Wahl ist als Scala. Es gibt mehr.
Die Python-API für Spark ist im Cluster möglicherweise langsamer, aber am Ende können Datenwissenschaftler im Vergleich zu Scala viel mehr damit anfangen. Die Komplexität von Scala fehlt. Die Oberfläche ist einfach und umfassend.
Über die Lesbarkeit von Code, die Wartung und die Vertrautheit mit der Python-API für Apache Spark zu sprechen, ist weitaus besser als mit Scala.
Python enthält mehrere Bibliotheken zum maschinellen Lernen und zur Verarbeitung natürlicher Sprachen. Dies hilft bei der Datenanalyse und enthält auch Statistiken, die sehr ausgereift und erprobt sind. Zum Beispiel Numpy, Pandas, Scikit-Learn, Seaborn und Matplotlib.
Hinweis: Die meisten Datenwissenschaftler verwenden einen hybriden Ansatz, bei dem sie das Beste aus beiden APIs verwenden.
Schließlich erweist sich die Scala-Community für Programmierer oft als weniger hilfreich. Dies macht Python zu einem sehr wertvollen Lernen. Wenn Sie genug Erfahrung mit einer statisch typisierten Programmiersprache wie Java haben, können Sie aufhören, sich Sorgen zu machen, dass Sie Scala nicht ganz verwenden.
quelle