Ruft die aktuelle Anzahl der Partitionen eines DataFrames ab

74

Gibt es eine Möglichkeit, die aktuelle Anzahl der Partitionen eines DataFrame abzurufen? Ich habe das DataFrame-Javadoc (Spark 1.6) überprüft und keine Methode dafür gefunden, oder habe ich es einfach verpasst? (Im Fall von JavaRDD gibt es eine getNumPartitions () -Methode.)

kecso
quelle

Antworten:

147

Sie müssen getNumPartitions()die zugrunde liegende RDD des DataFrame aufrufen , z df.rdd.getNumPartitions(). Im Fall von Scala ist dies eine parameterlose Methode : df.rdd.getNumPartitions.

user4601931
quelle
3
minus der (), also nicht ganz richtig - zumindest nicht im SCALA-Modus
thebluephantom
3
Verursacht dies eine Umstellung ( teuer ) von DFauf RDD?
StephenBoesch
2
Das ist teuer
StephenBoesch
@javadba Haben Sie eine Antwort, die die RDD-API nicht anspricht?
user4601931
Nein, das tue ich nicht. Und es ist bedauerlich, dass Spark die Metadaten nicht besser im Sinne von Hive verwaltet. Ihre Antwort ist richtig, aber auch meine Beobachtung, dass dies teuer ist.
StephenBoesch
20

dataframe.rdd.partitions.sizeist eine andere Alternative neben df.rdd.getNumPartitions()oder df.rdd.length.

Lassen Sie mich dies anhand eines vollständigen Beispiels erklären ...

val x = (1 to 10).toList
val numberDF = x.toDF(“number”)
numberDF.rdd.partitions.size // => 4

Um zu beweisen, wie viele Partitionen wir oben haben ... speichern Sie diesen Datenrahmen als CSV

numberDF.write.csv(“/Users/Ram.Ghadiyaram/output/numbers”)

So werden die Daten auf den verschiedenen Partitionen getrennt.

Partition 00000: 1, 2
Partition 00001: 3, 4, 5
Partition 00002: 6, 7
Partition 00003: 8, 9, 10

Update:

@Hemanth hat im Kommentar eine gute Frage gestellt ... im Grunde genommen, warum die Anzahl der Partitionen im obigen Fall 4 beträgt

Kurze Antwort: Hängt von Fällen ab, in denen Sie ausführen. seit ich local [4] verwendet habe, habe ich 4 Partitionen bekommen.

Lange Antwort :

Ich habe das obige Programm auf meinem lokalen Computer ausgeführt und Master als lokales [4] verwendet, basierend darauf, dass es als 4 Partitionen verwendet wurde.

val spark = SparkSession.builder()
    .appName(this.getClass.getName)
    .config("spark.master", "local[4]").getOrCreate()

Wenn seine Funkenschale aus Mastergarn besteht, habe ich die Anzahl der Partitionen als 2 erhalten

Beispiel: spark-shell --master yarnund tippte dieselben Befehle erneut ein

scala> val x = (1 to 10).toList
x: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


scala> val numberDF = x.toDF("number")
numberDF: org.apache.spark.sql.DataFrame = [number: int]

scala> numberDF.rdd.partitions.size
res0: Int = 2
  • hier ist 2 Standardparllelismus des Funkens
  • Basierend auf dem Hash-Partitionierer-Funken wird entschieden, wie viele Partitionen verteilt werden sollen. Wenn Sie in --master localund basierend auf Ihrem Runtime.getRuntime.availableProcessors() dh ausgeführt werden local[Runtime.getRuntime.availableProcessors()], wird versucht, diese Anzahl von Partitionen zuzuweisen. Wenn Ihre verfügbare Anzahl von Prozessoren 12 ist (dh local[Runtime.getRuntime.availableProcessors()])Sie eine Liste von 1 bis 10 haben, werden nur 10 Partitionen erstellt.

HINWEIS:

Wenn Sie sich auf einem 12-Core-Laptop befinden, auf dem ich ein Spark-Programm ausführe, und standardmäßig die Anzahl der Partitionen / Tasks die Anzahl aller verfügbaren Kerne ist, dh 12. Das bedeutet, local[*]oder s"local[${Runtime.getRuntime.availableProcessors()}]")in diesem Fall sind nur 10 Nummern vorhanden, sodass die Anzahl begrenzt wird bis 10

Unter Berücksichtigung all dieser Hinweise würde ich Ihnen empfehlen, es selbst zu versuchen

Ram Ghadiyaram
quelle
Danke für die tolle Antwort. Ich bin gespannt, warum eine Liste mit 10 Nummern bei der Konvertierung in einen DF in 4 Partitionen unterteilt wurde. Können Sie uns bitte eine Erklärung geben?
Hemanth
8

In RDD konvertieren und dann die Partitionslänge abrufen

DF.rdd.partitions.length
Bhargav Kosaraju
quelle
6
 val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

df.rdd.getNumPartitions
loneStar
quelle
Bitte lesen Sie diese Anleitung, um eine qualitativ hochwertige Antwort zu erhalten.
thewaywewere
0

Ein weiterer interessanter Weg, um die Anzahl der Partitionen zu ermitteln, ist die Transformation "MapPartitions verwenden". Beispielcode -

val x = (1 to 10).toList
val numberDF = x.toDF()
numberDF.rdd.mapPartitions(x => Iterator[Int](1)).sum()

Spark-Experten können sich gerne zu seiner Leistung äußern.

Shantanu Kher
quelle