Wie definiere ich die Partitionierung von DataFrame?

128

Ich habe begonnen, Spark SQL und DataFrames in Spark 1.4.0 zu verwenden. Ich möchte einen benutzerdefinierten Partitionierer in DataFrames in Scala definieren, sehe aber nicht, wie das geht.

Eine der Datentabellen, mit denen ich arbeite, enthält eine Liste von Transaktionen nach Konto, silimar zum folgenden Beispiel.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

Zumindest anfänglich werden die meisten Berechnungen zwischen den Transaktionen innerhalb eines Kontos durchgeführt. Daher möchte ich, dass die Daten so partitioniert werden, dass sich alle Transaktionen für ein Konto in derselben Spark-Partition befinden.

Aber ich sehe keinen Weg, dies zu definieren. Die DataFrame-Klasse verfügt über eine Methode namens 'repartition (Int)', mit der Sie die Anzahl der zu erstellenden Partitionen angeben können. Ich sehe jedoch keine verfügbare Methode zum Definieren eines benutzerdefinierten Partitionierers für einen DataFrame, wie sie für eine RDD angegeben werden kann.

Die Quelldaten werden in Parkett gespeichert. Ich habe gesehen, dass Sie beim Schreiben eines DataFrame in Parkett eine Spalte angeben können, nach der partitioniert werden soll. Vermutlich könnte ich Parkett anweisen, die Daten nach der Spalte "Konto" zu partitionieren. Aber es könnte Millionen von Konten geben, und wenn ich Parkett richtig verstehe, würde es für jedes Konto ein eigenes Verzeichnis erstellen, so dass dies nicht nach einer vernünftigen Lösung klang.

Gibt es eine Möglichkeit, Spark dazu zu bringen, diesen DataFrame so zu partitionieren, dass sich alle Daten für ein Konto in derselben Partition befinden?

Rechen
quelle
Überprüfen Sie diesen Link stackoverflow.com/questions/23127329/…
Abhishek Choudhary
Wenn Sie Parquet anweisen können, nach Konto zu partitionieren, können Sie wahrscheinlich nach partitionieren int(account/someInteger)und dadurch eine angemessene Anzahl von Konten pro Verzeichnis erhalten.
Paul
1
@ ABC: Ich habe diesen Link gesehen. partitionBy(Partitioner)Ich suchte nach dem Äquivalent dieser Methode, aber nach DataFrames anstelle von RDDs. Ich sehe jetzt, dass dies partitionBynur für Pair- RDDs verfügbar ist , nicht sicher, warum das so ist.
Rechen
@ Paul: Ich habe darüber nachgedacht, das zu tun, was du beschreibst. Ein paar Dinge hielten mich zurück:
Rechen
Fortsetzung .... (1) Das ist für "Parkett-Partitionierung". Ich konnte keine Dokumente finden, die besagen, dass die Spark-Partitionierung tatsächlich die Parkett-Partitionierung verwendet. (2) Wenn ich die Parkettdokumente verstehe, muss ich ein neues Feld "foo" definieren, dann hätte jedes Parkettverzeichnis einen Namen wie "foo = 123". Aber wenn ich eine Abfrage mit AccountID erstelle , wie würde Spark / hive / parquet wissen, dass es eine Verknüpfung zwischen foo und AccountID gibt ?
Rechen

Antworten:

177

Funke> = 2.3.0

SPARK-22614 macht die Bereichspartitionierung verfügbar .

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 macht die Partitionierung externer Formate in der Datenquellen-API v2 verfügbar .

Funke> = 1.6.0

In Spark> = 1.6 ist es möglich, die Partitionierung nach Spalten für die Abfrage und das Caching zu verwenden. Siehe: SPARK-11410 und SPARK-4849 unter Verwendung der folgenden repartitionMethode:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

Im Gegensatz zu RDDsSpark Dataset(einschließlich Dataset[Row]aka DataFrame) kann derzeit kein benutzerdefinierter Partitionierer verwendet werden. Sie können dies normalerweise beheben, indem Sie eine künstliche Partitionierungsspalte erstellen, die Ihnen jedoch nicht die gleiche Flexibilität bietet.

Funke <1.6.0:

Eine Sache, die Sie tun können, ist, Eingabedaten vorab zu partitionieren, bevor Sie eine erstellen DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Da für die DataFrameErstellung aus einer RDDnur eine einfache Kartenphase erforderlich ist, sollte das vorhandene Partitionslayout beibehalten werden *:

assert(df.rdd.partitions == partitioned.partitions)

Auf die gleiche Weise können Sie vorhandene Partitionen neu partitionieren DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

Es sieht also so aus, als wäre es nicht unmöglich. Die Frage bleibt, ob es überhaupt Sinn macht. Ich werde argumentieren, dass dies die meiste Zeit nicht der Fall ist:

  1. Die Neupartitionierung ist ein teurer Prozess. In einem typischen Szenario müssen die meisten Daten serialisiert, gemischt und deserialisiert werden. Andererseits ist die Anzahl der Vorgänge, die von vorpartitionierten Daten profitieren können, relativ gering und wird weiter eingeschränkt, wenn die interne API nicht dafür ausgelegt ist, diese Eigenschaft zu nutzen.

    • tritt in einigen Szenarien bei, würde aber eine interne Unterstützung erfordern,
    • Fensterfunktionsaufrufe mit passendem Partitionierer. Wie oben, beschränkt auf eine einzelne Fensterdefinition. Es ist jedoch bereits intern partitioniert, sodass die Vorpartitionierung möglicherweise redundant ist.
    • einfache Aggregationen mit GROUP BY- es ist möglich, den Speicherbedarf der temporären Puffer ** zu reduzieren, aber die Gesamtkosten sind viel höher. Mehr oder weniger äquivalent zu groupByKey.mapValues(_.reduce)(aktuelles Verhalten) vs reduceByKey(Vorpartitionierung). In der Praxis unwahrscheinlich.
    • Datenkomprimierung mit SqlContext.cacheTable. Da es so aussieht, als würde es eine Lauflängencodierung verwenden, OrderedRDDFunctions.repartitionAndSortWithinPartitionskönnte das Anwenden das Komprimierungsverhältnis verbessern.
  2. Die Leistung hängt stark von der Verteilung der Schlüssel ab. Wenn es schief ist, führt dies zu einer suboptimalen Ressourcennutzung. Im schlimmsten Fall ist es unmöglich, den Auftrag überhaupt zu beenden.

  3. Ein wichtiger Punkt bei der Verwendung einer deklarativen API auf hoher Ebene besteht darin, sich von den Implementierungsdetails auf niedriger Ebene zu isolieren. Wie bereits von @dwysakowicz und @RomiKuntsman erwähnt, ist eine Optimierung eine Aufgabe des Catalyst Optimizer . Es ist ein ziemlich raffiniertes Tier, und ich bezweifle wirklich, dass Sie das leicht verbessern können, ohne viel tiefer in seine Innereien einzutauchen.

Verwandte konzepte

Partitionierung mit JDBC-Quellen :

JDBC-Datenquellen unterstützen predicatesArgumente . Es kann wie folgt verwendet werden:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

Es wird eine einzelne JDBC-Partition pro Prädikat erstellt. Beachten Sie, dass in der resultierenden Tabelle Duplikate angezeigt werden, wenn Sätze, die mit einzelnen Prädikaten erstellt wurden, nicht disjunkt sind.

partitionByMethode inDataFrameWriter :

Spark DataFrameWriterbietet eine partitionByMethode, mit der Daten beim Schreiben "partitioniert" werden können. Es trennt Daten beim Schreiben unter Verwendung der bereitgestellten Spalten

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

Dies ermöglicht das Herunterdrücken von Prädikaten beim Lesen für Abfragen basierend auf dem Schlüssel:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

aber es ist nicht gleichbedeutend mit DataFrame.repartition. Insbesondere Aggregationen wie:

val cnts = df1.groupBy($"k").sum()

wird noch erfordern TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketByMethode inDataFrameWriter (Spark> = 2.0):

bucketByhat ähnliche Anwendungen wie partitionBy, ist jedoch nur für Tabellen verfügbar ( saveAsTable). Bucketing-Informationen können zur Optimierung von Joins verwendet werden:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* Mit Partitionslayout meine ich nur eine Datenverteilung. partitionedRDD hat keinen Partitionierer mehr. ** Vorausgesetzt, keine frühe Projektion. Wenn die Aggregation nur eine kleine Teilmenge von Spalten abdeckt, gibt es wahrscheinlich überhaupt keinen Gewinn.

null323
quelle
@bychance Ja und nein. Das Datenlayout bleibt erhalten, aber AFAIK bietet Ihnen keine Vorteile wie das Bereinigen von Partitionen.
Null323
@ zero323 Danke, gibt es eine Möglichkeit, die Partitionszuordnung der Parkettdatei zu überprüfen, um zu überprüfen, ob df.save.write tatsächlich das Layout speichert? Und wenn ich df.repartition ("A") mache, dann mache ich df.write.repartitionBy ("B"), die physische Ordnerstruktur wird durch B partitioniert, und in jedem B-Wert-Ordner wird die Partition weiterhin beibehalten EIN?
Möglicherweise
2
@bychance DataFrameWriter.partitionByist logischerweise nicht dasselbe wie DataFrame.repartition. Früher mischt nicht, sondern trennt einfach die Ausgabe. In Bezug auf die erste Frage werden Daten pro Partition gespeichert und es gibt kein Mischen. Sie können dies leicht überprüfen, indem Sie einzelne Dateien lesen. Aber Spark allein kann nicht wissen, ob Sie das wirklich wollen.
Null 323
11

In Spark <1.6 Wenn Sie ein erstellen HiveContext, nicht das einfache alte SqlContext, können Sie beispielsweise HiveQL verwenden DISTRIBUTE BY colX... (stellt sicher, dass jeder der N Reduzierer nicht überlappende Bereiche von x erhält) & CLUSTER BY colX...(Verknüpfung für Verteilen nach und Sortieren nach);

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

Ich bin mir nicht sicher, wie dies zur Spark DF-API passt. Diese Schlüsselwörter werden im normalen SqlContext nicht unterstützt (beachten Sie, dass Sie keinen Hive-Metaspeicher benötigen, um den HiveContext verwenden zu können).

BEARBEITEN : Spark 1.6+ hat dies jetzt in der nativen DataFrame-API

NightWolf
quelle
1
Werden die Partitionen beim Speichern des Datenrahmens beibehalten?
Sim
Wie steuern Sie, wie viele Partitionen Sie im Beispiel hive ql haben können? Beispiel: Im Paar-RDD-Ansatz können Sie dies tun, um 5 Partitionen zu erstellen: val partitioner = new HashPartitioner (5)
Minnie
ok, gefundene Antwort, es kann so gemacht werden: sqlContext.setConf ("spark.sql.shuffle.partitions", "5") Ich konnte den vorherigen Kommentar nicht bearbeiten, da ich das 5-Minuten-Limit verpasst habe
Minnie
7

Um mit einer Antwort zu beginnen :) - Das kannst du nicht

Ich bin kein Experte, aber soweit ich DataFrames verstehe, sind sie nicht gleich rdd und DataFrame hat keinen Partitionierer.

Im Allgemeinen besteht die Idee von DataFrame darin, eine andere Abstraktionsebene bereitzustellen, die solche Probleme selbst behandelt. Die Abfragen in DataFrame werden in einen logischen Plan übersetzt, der weiter in Operationen auf RDDs übersetzt wird. Die von Ihnen vorgeschlagene Partitionierung wird wahrscheinlich automatisch angewendet oder sollte es zumindest sein.

Wenn Sie SparkSQL nicht vertrauen, dass es einen optimalen Job bietet, können Sie DataFrame jederzeit in RDD [Zeile] umwandeln, wie in den Kommentaren vorgeschlagen.

Dawid Wysakowicz
quelle
7

Verwenden Sie den von: zurückgegebenen DataFrame:

yourDF.orderBy(account)

Es gibt keine explizite Möglichkeit, partitionByeinen DataFrame nur auf einem PairRDD zu verwenden. Wenn Sie jedoch einen DataFrame sortieren, wird dieser in seinem LogicalPlan verwendet, und dies ist hilfreich, wenn Sie Berechnungen für jedes Konto durchführen müssen.

Ich bin gerade auf das gleiche Problem gestoßen, mit einem Datenrahmen, den ich nach Konto partitionieren möchte. Ich gehe davon aus, dass Sie, wenn Sie sagen, dass die Daten so partitioniert werden sollen, dass sich alle Transaktionen für ein Konto in derselben Spark-Partition befinden, dies für Skalierung und Leistung wünschen, Ihr Code jedoch nicht davon abhängt (wie bei der Verwendung) mapPartitions()etc), richtig?

Romi Kuntsman
quelle
3
Was ist, wenn Ihr Code davon abhängt, weil Sie mapPartitions verwenden?
NightWolf
2
Sie können den DataFrame in einen RDD konvertieren und dann partitionieren (z. B. mit aggregatByKey () und einen benutzerdefinierten Partitionierer übergeben)
Romi Kuntsman
5

Ich konnte dies mit RDD tun. Aber ich weiß nicht, ob dies eine akzeptable Lösung für Sie ist. Sobald Sie den DF als RDD verfügbar haben, können Sie repartitionAndSortWithinPartitionseine benutzerdefinierte Neupartitionierung von Daten durchführen.

Hier ist ein Beispiel, das ich verwendet habe:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)
Entwickler
quelle