Ich untersuche das Verhalten von Spark, wenn ich einen Tisch mit sich selbst verbinde. Ich benutze Databricks.
Mein Dummy-Szenario ist:
Lesen Sie eine externe Tabelle als Datenrahmen A (zugrunde liegende Dateien sind im Delta-Format)
Definieren Sie Datenrahmen B als Datenrahmen A, wobei nur bestimmte Spalten ausgewählt sind
Verbinden Sie die Datenrahmen A und B in Spalte1 und Spalte2
(Ja, es macht nicht viel Sinn, ich experimentiere nur, um die zugrunde liegende Mechanik von Spark zu verstehen.)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
Mein erster Versuch war, den Code so auszuführen, wie er ist (Versuch 1). Ich habe dann versucht, neu zu partitionieren und zwischenzuspeichern (Versuch 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Schließlich habe ich neu partitioniert, sortiert und zwischengespeichert
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
Die jeweils erzeugten Dags sind wie beigefügt.
Meine Fragen sind:
Warum in Versuch 1 die Tabelle zwischengespeichert zu sein scheint, obwohl das Zwischenspeichern nicht explizit angegeben wurde.
Warum auf InMemoreTableScan immer ein anderer Knoten dieses Typs folgt.
Warum scheint in Versuch 3 das Caching auf zwei Stufen zu erfolgen?
Warum in Versuch 3 WholeStageCodegen einem (und nur einem) InMemoreTableScan folgt.
Antworten:
Was Sie in diesen drei Plänen beobachten, ist eine Mischung aus DataBricks-Laufzeit und Spark.
Während der Ausführung von DataBricks Runtime 3.3+ wird das Caching zunächst automatisch für alle Parkettdateien aktiviert. Entsprechende Konfiguration dafür:
spark.databricks.io.cache.enabled true
Bei Ihrer zweiten Abfrage wird InMemoryTableScan zweimal ausgeführt, da Spark beim Aufruf des Joins versucht hat, Datensatz A und Datensatz B parallel zu berechnen. Unter der Annahme, dass verschiedenen Executoren die oben genannten Aufgaben zugewiesen wurden, müssen beide die Tabelle aus dem (DataBricks) -Cache scannen.
Beim dritten bezieht sich InMemoryTableScan nicht auf das Caching an sich. Dies bedeutet lediglich, dass der gebildete Plan-Katalysator die zwischengespeicherte Tabelle mehrmals scannt.
PS: Ich kann den Punkt 4 nicht visualisieren :)
quelle