Ich habe einen Datenrahmen mit folgendem Code:
def test(lat: Double, lon: Double) = {
println(s"testing ${lat / lon}")
Map("one" -> "one", "two" -> "two")
}
val testUDF = udf(test _)
df.withColumn("test", testUDF(col("lat"), col("lon")))
.withColumn("test1", col("test.one"))
.withColumn("test2", col("test.two"))
Als ich nun die Protokolle überprüfte, stellte ich fest, dass die UDF für jede Zeile dreimal ausgeführt wird. Wenn ich "test3" aus einer "test.three" -Spalte hinzufüge, wird die UDF erneut ausgeführt.
Kann mir jemand erklären warum?
Kann dies richtig vermieden werden (ohne den Datenrahmen zwischenzuspeichern, nachdem "test" hinzugefügt wurde, auch wenn dies funktioniert)?
scala
apache-spark
apache-spark-sql
Rolintocour
quelle
quelle
Map
und keine Struktur. Anstatt eine Map zurückzugeben, ist die UDFtest
zwar eine Fallklasse wie Test ( eine Zeichenfolge , zwei: Zeichenfolge ), dann aber eine Struktur, aber es gibt immer so viele Ausführungen der UDF.Antworten:
Wenn Sie mehrere Aufrufe eines udf vermeiden möchten (was insbesondere dann nützlich ist, wenn das udf ein Engpass in Ihrem Job ist), können Sie dies wie folgt tun:
Grundsätzlich teilen Sie Spark mit, dass Ihre Funktion nicht deterministisch ist, und Spark stellt jetzt sicher, dass sie nur einmal aufgerufen wird, da es nicht sicher ist, sie mehrmals aufzurufen (jeder Aufruf kann möglicherweise ein anderes Ergebnis zurückgeben).
Beachten Sie auch, dass dieser Trick nicht kostenlos ist. Auf diese Weise setzen Sie dem Optimierer einige Einschränkungen auf. Ein Nebeneffekt davon ist beispielsweise, dass der Spark-Optimierer keine Filter durch Ausdrücke schiebt, die nicht deterministisch sind, sodass Sie für das Optimum verantwortlich sind Position der Filter in Ihrer Abfrage.
quelle
asNondeterministic
zwingt das die UDF, nur einmal auszuführen. Mit derexplode(array(myUdf($"id")))
Lösung wird es immer noch zweimal ausgeführt.