Apache Spark: Auswirkungen der Neupartitionierung, Sortierung und Zwischenspeicherung auf einen Join

10

Ich untersuche das Verhalten von Spark, wenn ich einen Tisch mit sich selbst verbinde. Ich benutze Databricks.

Mein Dummy-Szenario ist:

  1. Lesen Sie eine externe Tabelle als Datenrahmen A (zugrunde liegende Dateien sind im Delta-Format)

  2. Definieren Sie Datenrahmen B als Datenrahmen A, wobei nur bestimmte Spalten ausgewählt sind

  3. Verbinden Sie die Datenrahmen A und B in Spalte1 und Spalte2

(Ja, es macht nicht viel Sinn, ich experimentiere nur, um die zugrunde liegende Mechanik von Spark zu verstehen.)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])

Mein erster Versuch war, den Code so auszuführen, wie er ist (Versuch 1). Ich habe dann versucht, neu zu partitionieren und zwischenzuspeichern (Versuch 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()

Schließlich habe ich neu partitioniert, sortiert und zwischengespeichert

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()

Die jeweils erzeugten Dags sind wie beigefügt.

Meine Fragen sind:

  1. Warum in Versuch 1 die Tabelle zwischengespeichert zu sein scheint, obwohl das Zwischenspeichern nicht explizit angegeben wurde.

  2. Warum auf InMemoreTableScan immer ein anderer Knoten dieses Typs folgt.

  3. Warum scheint in Versuch 3 das Caching auf zwei Stufen zu erfolgen?

  4. Warum in Versuch 3 WholeStageCodegen einem (und nur einem) InMemoreTableScan folgt.

Versuch 1

Versuch 2

Geben Sie hier die Bildbeschreibung ein

Dawid
quelle
Ich vermute, dass der DataFrame-Reader Daten automatisch zwischenspeichert, wenn die Quelle eine externe Tabelle ist. Ich habe eine ähnliche Situation, in der ich Daten aus einer Datenbanktabelle lese, während sie heruntergeladen werden können. Auf der Registerkarte "SQL" unter "Benutzeroberfläche für Anwendungsdetails" wird die Anzahl der heruntergeladenen Zeilen angezeigt, es wurde jedoch noch keine Datei am angegebenen Speicherort gespeichert . Ich denke, es kennt die Anzahl, weil es die Daten irgendwo zwischengespeichert hat und das ist, was auf der DAG erscheint. Wenn Sie Daten lokal aus einer Textdatei lesen, wird der Cache-Status nicht angezeigt.
Salim

Antworten:

4

Was Sie in diesen drei Plänen beobachten, ist eine Mischung aus DataBricks-Laufzeit und Spark.

Während der Ausführung von DataBricks Runtime 3.3+ wird das Caching zunächst automatisch für alle Parkettdateien aktiviert. Entsprechende Konfiguration dafür: spark.databricks.io.cache.enabled true

Bei Ihrer zweiten Abfrage wird InMemoryTableScan zweimal ausgeführt, da Spark beim Aufruf des Joins versucht hat, Datensatz A und Datensatz B parallel zu berechnen. Unter der Annahme, dass verschiedenen Executoren die oben genannten Aufgaben zugewiesen wurden, müssen beide die Tabelle aus dem (DataBricks) -Cache scannen.

Beim dritten bezieht sich InMemoryTableScan nicht auf das Caching an sich. Dies bedeutet lediglich, dass der gebildete Plan-Katalysator die zwischengespeicherte Tabelle mehrmals scannt.

PS: Ich kann den Punkt 4 nicht visualisieren :)

Ashvjit Singh
quelle