Ich habe begonnen, Spark SQL und DataFrames in Spark 1.4.0 zu verwenden. Ich möchte einen benutzerdefinierten Partitionierer in DataFrames in Scala definieren, sehe aber nicht, wie das geht.
Eine der Datentabellen, mit denen ich arbeite, enthält eine Liste von Transaktionen nach Konto, silimar zum folgenden Beispiel.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Zumindest anfänglich werden die meisten Berechnungen zwischen den Transaktionen innerhalb eines Kontos durchgeführt. Daher möchte ich, dass die Daten so partitioniert werden, dass sich alle Transaktionen für ein Konto in derselben Spark-Partition befinden.
Aber ich sehe keinen Weg, dies zu definieren. Die DataFrame-Klasse verfügt über eine Methode namens 'repartition (Int)', mit der Sie die Anzahl der zu erstellenden Partitionen angeben können. Ich sehe jedoch keine verfügbare Methode zum Definieren eines benutzerdefinierten Partitionierers für einen DataFrame, wie sie für eine RDD angegeben werden kann.
Die Quelldaten werden in Parkett gespeichert. Ich habe gesehen, dass Sie beim Schreiben eines DataFrame in Parkett eine Spalte angeben können, nach der partitioniert werden soll. Vermutlich könnte ich Parkett anweisen, die Daten nach der Spalte "Konto" zu partitionieren. Aber es könnte Millionen von Konten geben, und wenn ich Parkett richtig verstehe, würde es für jedes Konto ein eigenes Verzeichnis erstellen, so dass dies nicht nach einer vernünftigen Lösung klang.
Gibt es eine Möglichkeit, Spark dazu zu bringen, diesen DataFrame so zu partitionieren, dass sich alle Daten für ein Konto in derselben Partition befinden?
int(account/someInteger)
und dadurch eine angemessene Anzahl von Konten pro Verzeichnis erhalten.partitionBy(Partitioner)
Ich suchte nach dem Äquivalent dieser Methode, aber nach DataFrames anstelle von RDDs. Ich sehe jetzt, dass diespartitionBy
nur für Pair- RDDs verfügbar ist , nicht sicher, warum das so ist.Antworten:
Funke> = 2.3.0
SPARK-22614 macht die Bereichspartitionierung verfügbar .
SPARK-22389 macht die Partitionierung externer Formate in der Datenquellen-API v2 verfügbar .
Funke> = 1.6.0
In Spark> = 1.6 ist es möglich, die Partitionierung nach Spalten für die Abfrage und das Caching zu verwenden. Siehe: SPARK-11410 und SPARK-4849 unter Verwendung der folgenden
repartition
Methode:Im Gegensatz zu
RDDs
SparkDataset
(einschließlichDataset[Row]
akaDataFrame
) kann derzeit kein benutzerdefinierter Partitionierer verwendet werden. Sie können dies normalerweise beheben, indem Sie eine künstliche Partitionierungsspalte erstellen, die Ihnen jedoch nicht die gleiche Flexibilität bietet.Funke <1.6.0:
Eine Sache, die Sie tun können, ist, Eingabedaten vorab zu partitionieren, bevor Sie eine erstellen
DataFrame
Da für die
DataFrame
Erstellung aus einerRDD
nur eine einfache Kartenphase erforderlich ist, sollte das vorhandene Partitionslayout beibehalten werden *:Auf die gleiche Weise können Sie vorhandene Partitionen neu partitionieren
DataFrame
:Es sieht also so aus, als wäre es nicht unmöglich. Die Frage bleibt, ob es überhaupt Sinn macht. Ich werde argumentieren, dass dies die meiste Zeit nicht der Fall ist:
Die Neupartitionierung ist ein teurer Prozess. In einem typischen Szenario müssen die meisten Daten serialisiert, gemischt und deserialisiert werden. Andererseits ist die Anzahl der Vorgänge, die von vorpartitionierten Daten profitieren können, relativ gering und wird weiter eingeschränkt, wenn die interne API nicht dafür ausgelegt ist, diese Eigenschaft zu nutzen.
GROUP BY
- es ist möglich, den Speicherbedarf der temporären Puffer ** zu reduzieren, aber die Gesamtkosten sind viel höher. Mehr oder weniger äquivalent zugroupByKey.mapValues(_.reduce)
(aktuelles Verhalten) vsreduceByKey
(Vorpartitionierung). In der Praxis unwahrscheinlich.SqlContext.cacheTable
. Da es so aussieht, als würde es eine Lauflängencodierung verwenden,OrderedRDDFunctions.repartitionAndSortWithinPartitions
könnte das Anwenden das Komprimierungsverhältnis verbessern.Die Leistung hängt stark von der Verteilung der Schlüssel ab. Wenn es schief ist, führt dies zu einer suboptimalen Ressourcennutzung. Im schlimmsten Fall ist es unmöglich, den Auftrag überhaupt zu beenden.
Verwandte konzepte
Partitionierung mit JDBC-Quellen :
JDBC-Datenquellen unterstützen
predicates
Argumente . Es kann wie folgt verwendet werden:Es wird eine einzelne JDBC-Partition pro Prädikat erstellt. Beachten Sie, dass in der resultierenden Tabelle Duplikate angezeigt werden, wenn Sätze, die mit einzelnen Prädikaten erstellt wurden, nicht disjunkt sind.
partitionBy
Methode inDataFrameWriter
:Spark
DataFrameWriter
bietet einepartitionBy
Methode, mit der Daten beim Schreiben "partitioniert" werden können. Es trennt Daten beim Schreiben unter Verwendung der bereitgestellten SpaltenDies ermöglicht das Herunterdrücken von Prädikaten beim Lesen für Abfragen basierend auf dem Schlüssel:
aber es ist nicht gleichbedeutend mit
DataFrame.repartition
. Insbesondere Aggregationen wie:wird noch erfordern
TungstenExchange
:bucketBy
Methode inDataFrameWriter
(Spark> = 2.0):bucketBy
hat ähnliche Anwendungen wiepartitionBy
, ist jedoch nur für Tabellen verfügbar (saveAsTable
). Bucketing-Informationen können zur Optimierung von Joins verwendet werden:* Mit Partitionslayout meine ich nur eine Datenverteilung.
partitioned
RDD hat keinen Partitionierer mehr. ** Vorausgesetzt, keine frühe Projektion. Wenn die Aggregation nur eine kleine Teilmenge von Spalten abdeckt, gibt es wahrscheinlich überhaupt keinen Gewinn.quelle
DataFrameWriter.partitionBy
ist logischerweise nicht dasselbe wieDataFrame.repartition
. Früher mischt nicht, sondern trennt einfach die Ausgabe. In Bezug auf die erste Frage werden Daten pro Partition gespeichert und es gibt kein Mischen. Sie können dies leicht überprüfen, indem Sie einzelne Dateien lesen. Aber Spark allein kann nicht wissen, ob Sie das wirklich wollen.In Spark <1.6 Wenn Sie ein erstellen
HiveContext
, nicht das einfache alteSqlContext
, können Sie beispielsweise HiveQL verwendenDISTRIBUTE BY colX...
(stellt sicher, dass jeder der N Reduzierer nicht überlappende Bereiche von x erhält) &CLUSTER BY colX...
(Verknüpfung für Verteilen nach und Sortieren nach);Ich bin mir nicht sicher, wie dies zur Spark DF-API passt. Diese Schlüsselwörter werden im normalen SqlContext nicht unterstützt (beachten Sie, dass Sie keinen Hive-Metaspeicher benötigen, um den HiveContext verwenden zu können).
BEARBEITEN : Spark 1.6+ hat dies jetzt in der nativen DataFrame-API
quelle
Um mit einer Antwort zu beginnen :) - Das kannst du nicht
Ich bin kein Experte, aber soweit ich DataFrames verstehe, sind sie nicht gleich rdd und DataFrame hat keinen Partitionierer.
Im Allgemeinen besteht die Idee von DataFrame darin, eine andere Abstraktionsebene bereitzustellen, die solche Probleme selbst behandelt. Die Abfragen in DataFrame werden in einen logischen Plan übersetzt, der weiter in Operationen auf RDDs übersetzt wird. Die von Ihnen vorgeschlagene Partitionierung wird wahrscheinlich automatisch angewendet oder sollte es zumindest sein.
Wenn Sie SparkSQL nicht vertrauen, dass es einen optimalen Job bietet, können Sie DataFrame jederzeit in RDD [Zeile] umwandeln, wie in den Kommentaren vorgeschlagen.
quelle
Verwenden Sie den von: zurückgegebenen DataFrame:
Es gibt keine explizite Möglichkeit,
partitionBy
einen DataFrame nur auf einem PairRDD zu verwenden. Wenn Sie jedoch einen DataFrame sortieren, wird dieser in seinem LogicalPlan verwendet, und dies ist hilfreich, wenn Sie Berechnungen für jedes Konto durchführen müssen.Ich bin gerade auf das gleiche Problem gestoßen, mit einem Datenrahmen, den ich nach Konto partitionieren möchte. Ich gehe davon aus, dass Sie, wenn Sie sagen, dass die Daten so partitioniert werden sollen, dass sich alle Transaktionen für ein Konto in derselben Spark-Partition befinden, dies für Skalierung und Leistung wünschen, Ihr Code jedoch nicht davon abhängt (wie bei der Verwendung)
mapPartitions()
etc), richtig?quelle
Ich konnte dies mit RDD tun. Aber ich weiß nicht, ob dies eine akzeptable Lösung für Sie ist. Sobald Sie den DF als RDD verfügbar haben, können Sie
repartitionAndSortWithinPartitions
eine benutzerdefinierte Neupartitionierung von Daten durchführen.Hier ist ein Beispiel, das ich verwendet habe:
quelle