Spark: UDF wurde viele Male ausgeführt

9

Ich habe einen Datenrahmen mit folgendem Code:

def test(lat: Double, lon: Double) = {
  println(s"testing ${lat / lon}")
  Map("one" -> "one", "two" -> "two")
}

val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

Als ich nun die Protokolle überprüfte, stellte ich fest, dass die UDF für jede Zeile dreimal ausgeführt wird. Wenn ich "test3" aus einer "test.three" -Spalte hinzufüge, wird die UDF erneut ausgeführt.

Kann mir jemand erklären warum?

Kann dies richtig vermieden werden (ohne den Datenrahmen zwischenzuspeichern, nachdem "test" hinzugefügt wurde, auch wenn dies funktioniert)?

Rolintocour
quelle
Was meinst du? Sie rufen die Testfunktion dreimal auf. Deshalb wird es dreimal ausgeführt. Ich bin mir nicht sicher, warum du es zu einer UDF machst. Warum nicht einfach die Karte zu einem Wert machen?
user4601931
Dies ist nur ein Beispiel, um das Verhalten von Funken zu zeigen. Für mich ist "test" eine neue Spalte, die eine Struktur enthält. Wenn Sie dann auf einen Teil der Struktur zugreifen, sollte die UDF nicht erneut ausgeführt werden. Wie irre ich mich
Rolintocour
Ich habe versucht, das Schema zu drucken, der Datentyp von "test" ist Mapund keine Struktur. Anstatt eine Map zurückzugeben, ist die UDF testzwar eine Fallklasse wie Test ( eine Zeichenfolge , zwei: Zeichenfolge ), dann aber eine Struktur, aber es gibt immer so viele Ausführungen der UDF.
Rolintocour
Das Caching sollte gemäß dieser Antwort funktionieren
Raphael Roth

Antworten:

5

Wenn Sie mehrere Aufrufe eines udf vermeiden möchten (was insbesondere dann nützlich ist, wenn das udf ein Engpass in Ihrem Job ist), können Sie dies wie folgt tun:

val testUDF = udf(test _).asNondeterministic()

Grundsätzlich teilen Sie Spark mit, dass Ihre Funktion nicht deterministisch ist, und Spark stellt jetzt sicher, dass sie nur einmal aufgerufen wird, da es nicht sicher ist, sie mehrmals aufzurufen (jeder Aufruf kann möglicherweise ein anderes Ergebnis zurückgeben).

Beachten Sie auch, dass dieser Trick nicht kostenlos ist. Auf diese Weise setzen Sie dem Optimierer einige Einschränkungen auf. Ein Nebeneffekt davon ist beispielsweise, dass der Spark-Optimierer keine Filter durch Ausdrücke schiebt, die nicht deterministisch sind, sodass Sie für das Optimum verantwortlich sind Position der Filter in Ihrer Abfrage.

David Vrba
quelle
nett! Diese Antwort gehört auch hierher: stackoverflow.com/questions/40320563/…
Raphael Roth
In meinem Fall asNondeterministiczwingt das die UDF, nur einmal auszuführen. Mit der explode(array(myUdf($"id")))Lösung wird es immer noch zweimal ausgeführt.
Rolintocour