Seltsames Verhalten beim Aufrufen einer Funktion außerhalb eines Abschlusses:
- Wenn sich die Funktion in einem Objekt befindet, funktioniert alles
- Wenn die Funktion in einer Klasse ist, erhalten Sie:
Aufgabe nicht serialisierbar: java.io.NotSerializableException: Testen
Das Problem ist, dass ich meinen Code in einer Klasse und nicht in einem Objekt benötige. Irgendeine Idee, warum das passiert? Ist ein Scala-Objekt serialisiert (Standard?)?
Dies ist ein funktionierendes Codebeispiel:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
Dies ist das nicht funktionierende Beispiel:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
scala
serialization
apache-spark
typesafe
Nimrod007
quelle
quelle
Antworten:
RDDs erweitern die Serialisable-Schnittstelle , sodass Ihre Aufgabe nicht fehlschlägt. Dies bedeutet nicht, dass Sie eine
RDD
mit Spark serialisieren und vermeiden könnenNotSerializableException
Spark ist eine verteilte Computer-Engine und ihre Hauptabstraktion ist ein ausfallsicheres verteiltes Dataset ( RDD ), das als verteilte Sammlung angesehen werden kann. Grundsätzlich sind die RDD-Elemente auf die Knoten des Clusters verteilt, aber Spark abstrahiert dies vom Benutzer weg, sodass der Benutzer mit der RDD (Sammlung) interagieren kann, als wäre es eine lokale.
Nicht , um in zu vielen Details, aber wenn man verschiedene Transformationen auf einem RDD laufen (
map
,flatMap
,filter
und andere), Ihr Transformationscode (Verschluss) ist:Sie können dies natürlich lokal ausführen (wie in Ihrem Beispiel), aber alle diese Phasen (außer Versand über Netzwerk) treten weiterhin auf. [Auf diese Weise können Sie Fehler bereits vor der Bereitstellung in der Produktion erkennen.]
In Ihrem zweiten Fall rufen Sie eine Methode auf, die in der Klasse
testing
innerhalb der Map-Funktion definiert ist. Spark sieht das und da Methoden nicht alleine serialisiert werden können, versucht Spark, die gesamtetesting
Klasse zu serialisieren , damit der Code weiterhin funktioniert, wenn er in einer anderen JVM ausgeführt wird. Sie haben zwei Möglichkeiten:Entweder machen Sie Klassentests serialisierbar, sodass die gesamte Klasse von Spark serialisiert werden kann:
oder Sie machen eine
someFunc
Funktion anstelle einer Methode (Funktionen sind Objekte in Scala), damit Spark sie serialisieren kann:Ein ähnliches, aber nicht dasselbe Problem bei der Klassenserialisierung kann für Sie von Interesse sein, und Sie können es in dieser Präsentation zum Spark Summit 2013 nachlesen .
Als Randnotiz, können Sie umschreiben
rddList.map(someFunc(_))
zurddList.map(someFunc)
, sie sind genau gleich. Normalerweise wird die zweite bevorzugt, da sie weniger ausführlich und sauber zu lesen ist.BEARBEITEN (2015-03-15): SPARK-5307 führte SerializationDebugger ein und Spark 1.3.0 ist die erste Version, die es verwendet. Es fügt einer NotSerializableException einen Serialisierungspfad hinzu . Wenn eine NotSerializableException auftritt, besucht der Debugger das Objektdiagramm, um den Pfad zum Objekt zu finden, der nicht serialisiert werden kann, und erstellt Informationen, die dem Benutzer helfen, das Objekt zu finden.
Im Fall von OP wird Folgendes auf stdout gedruckt:
quelle
val test = new Test with Serializable
Gregas Antwort ist großartig, um zu erklären, warum der ursprüngliche Code nicht funktioniert, und um das Problem auf zwei Arten zu beheben. Diese Lösung ist jedoch nicht sehr flexibel; Betrachten Sie den Fall, in dem Ihr Abschluss einen Methodenaufruf für eine Nichtklasse enthält
Serializable
, über die Sie keine Kontrolle haben. Sie könnenSerializable
dieser Klasse weder das Tag hinzufügen noch die zugrunde liegende Implementierung ändern, um die Methode in eine Funktion umzuwandeln.Nilesh bietet hierfür eine großartige Problemumgehung , aber die Lösung kann sowohl präziser als auch allgemeiner gestaltet werden:
Dieser Funktions-Serializer kann dann verwendet werden, um Verschlüsse und Methodenaufrufe automatisch zu verpacken:
Diese Technik hat auch den Vorteil, dass für den Zugriff keine zusätzlichen Shark-Abhängigkeiten erforderlich sind
KryoSerializationWrapper
, da Twitter's Chill bereits von Core Spark übernommen wirdquelle
Vollständiger Vortrag zur vollständigen Erläuterung des Problems, das einen großartigen Paradigmenwechsel vorschlägt, um diese Serialisierungsprobleme zu vermeiden: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- Lecks-no-ws.md
Die am häufigsten gewählte Antwort schlägt im Grunde vor, ein ganzes Sprachmerkmal wegzuwerfen - das heißt, es werden keine Methoden mehr verwendet und nur noch Funktionen. In der Tat sollten Methoden zur funktionalen Programmierung in Klassen vermieden werden, aber ihre Umwandlung in Funktionen löst das Designproblem hier nicht (siehe obigen Link).
Als schnelle Lösung in dieser speziellen Situation können Sie einfach die
@transient
Anmerkung verwenden, um anzuweisen, dass nicht versucht werden soll, den fehlerhaften Wert zu serialisieren (hierSpark.ctx
handelt es sich um eine benutzerdefinierte Klasse, die nach der Benennung von OP nicht die von Spark ist):Sie können den Code auch so umstrukturieren, dass rddList woanders lebt, aber das ist auch böse.
Die Zukunft ist wahrscheinlich Sporen
In Zukunft wird Scala diese Dinge enthalten, die als "Sporen" bezeichnet werden und die es uns ermöglichen sollen, die Feinkornkontrolle zu steuern, was durch einen Verschluss genau hineingezogen wird und was nicht. Darüber hinaus sollte dies alle Fehler beim versehentlichen Abrufen nicht serialisierbarer Typen (oder unerwünschter Werte) in Kompilierungsfehler verwandeln und nicht jetzt, was schreckliche Laufzeitausnahmen / Speicherlecks darstellt.
http://docs.scala-lang.org/sips/pending/spores.html
Ein Tipp zur Kryo-Serialisierung
Stellen Sie bei der Verwendung von kyro sicher, dass eine Registrierung erforderlich ist. Dies bedeutet, dass Sie Fehler anstelle von Speicherlecks erhalten:
"Schließlich weiß ich, dass kryo kryo.setRegistrationOptional (true) hat, aber es fällt mir sehr schwer, herauszufinden, wie ich es verwenden soll. Wenn diese Option aktiviert ist, scheint kryo immer noch Ausnahmen auszulösen, wenn ich mich nicht registriert habe Klassen."
Strategie zur Anmeldung von Klassen bei kryo
Dies gibt Ihnen natürlich nur eine Kontrolle auf Typebene, keine Kontrolle auf Wertebene.
... weitere Ideen folgen.
quelle
Ich habe dieses Problem mit einem anderen Ansatz gelöst. Sie müssen lediglich die Objekte serialisieren, bevor Sie den Verschluss durchlaufen, und anschließend de-serialisieren. Dieser Ansatz funktioniert nur, auch wenn Ihre Klassen nicht serialisierbar sind, da Kryo hinter den Kulissen verwendet wird. Alles was Sie brauchen ist etwas Curry. ;)
Hier ist ein Beispiel, wie ich es gemacht habe:
Fühlen Sie sich frei, Blah so kompliziert zu machen, wie Sie möchten, Klasse, Begleitobjekt, verschachtelte Klassen, Verweise auf mehrere Bibliotheken von Drittanbietern.
KryoSerializationWrapper bezieht sich auf: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
quelle
KryoSerializationWrapper
Sie feststellen, dass Spark denkt, dass es tatsächlichjava.io.Serializable
- es serialisiert das Objekt einfach intern mit Kryo - schneller und einfacher ist. Und ich glaube nicht, dass es sich um eine statische Instanz handelt - es de-serialisiert nur den Wert, wenn value.apply () aufgerufen wird.Ich stand vor einem ähnlichen Problem, und was ich aus Gregas Antwort verstehe , ist
Ihre doIT- Methode versucht, eine someFunc (_) -Methode zu serialisieren. Da die Methode jedoch nicht serialisierbar ist, versucht sie, Klassentests zu serialisieren , die wiederum nicht serialisierbar sind.
Damit Ihr Code funktioniert, sollten Sie someFunc in der doIT- Methode definieren. Beispielsweise:
Und wenn mehrere Funktionen ins Bild kommen, sollten alle diese Funktionen für den übergeordneten Kontext verfügbar sein.
quelle
Ich bin nicht ganz sicher, ob dies für Scala gilt, aber in Java habe ich das Problem gelöst,
NotSerializableException
indem ich meinen Code so umgestaltet habe, dass der Abschluss nicht auf ein nicht serialisierbaresfinal
Feld zugegriffen hat.quelle
FileWriter
einfinal
Feld der äußeren Klasse ist, kannst du es nicht tun. AberFileWriter
aus einem aufgebaut seinString
oder eineFile
, die beideSerializable
. Überarbeiten Sie also Ihren Code, um einen lokalen CodeFileWriter
basierend auf dem Dateinamen der äußeren Klasse zu erstellen .Zu Ihrer Information, in Spark 2.4 werden wahrscheinlich viele von Ihnen auf dieses Problem stoßen. Die Kryo-Serialisierung ist besser geworden, aber in vielen Fällen können Sie spark.kryo.unsafe = true oder den naiven Kryo-Serializer nicht verwenden.
Versuchen Sie für eine schnelle Lösung Folgendes in Ihrer Spark-Konfiguration zu ändern
ODER
Ich ändere benutzerdefinierte RDD-Transformationen, auf die ich stoße oder die ich persönlich schreibe, indem ich explizite Broadcast-Variablen verwende und die neue integrierte Twitter-Chill-API verwende, um sie von
rdd.map(row =>
inrdd.mapPartitions(partition => {
Funktionen zu konvertieren .Beispiel
Alter (nicht großer) Weg
Alternativer (besserer) Weg
Auf diese neue Weise wird die Broadcast-Variable nur einmal pro Partition aufgerufen, was besser ist. Sie müssen weiterhin die Java-Serialisierung verwenden, wenn Sie keine Klassen registrieren.
quelle