Aufgabe nicht serialisierbar: java.io.NotSerializableException beim Aufruf der Funktion außerhalb des Abschlusses nur für Klassen, nicht für Objekte

224

Seltsames Verhalten beim Aufrufen einer Funktion außerhalb eines Abschlusses:

  • Wenn sich die Funktion in einem Objekt befindet, funktioniert alles
  • Wenn die Funktion in einer Klasse ist, erhalten Sie:

Aufgabe nicht serialisierbar: java.io.NotSerializableException: Testen

Das Problem ist, dass ich meinen Code in einer Klasse und nicht in einem Objekt benötige. Irgendeine Idee, warum das passiert? Ist ein Scala-Objekt serialisiert (Standard?)?

Dies ist ein funktionierendes Codebeispiel:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

Dies ist das nicht funktionierende Beispiel:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}
Nimrod007
quelle
Was ist Spark.ctx? Es gibt kein Spark-Objekt mit der Methode ctx AFAICT
javadba

Antworten:

333

RDDs erweitern die Serialisable-Schnittstelle , sodass Ihre Aufgabe nicht fehlschlägt. Dies bedeutet nicht, dass Sie eine RDDmit Spark serialisieren und vermeiden könnenNotSerializableException

Spark ist eine verteilte Computer-Engine und ihre Hauptabstraktion ist ein ausfallsicheres verteiltes Dataset ( RDD ), das als verteilte Sammlung angesehen werden kann. Grundsätzlich sind die RDD-Elemente auf die Knoten des Clusters verteilt, aber Spark abstrahiert dies vom Benutzer weg, sodass der Benutzer mit der RDD (Sammlung) interagieren kann, als wäre es eine lokale.

Nicht , um in zu vielen Details, aber wenn man verschiedene Transformationen auf einem RDD laufen ( map, flatMap, filterund andere), Ihr Transformationscode (Verschluss) ist:

  1. auf dem Treiberknoten serialisiert,
  2. an die entsprechenden Knoten im Cluster gesendet,
  3. deserialisiert,
  4. und schließlich auf den Knoten ausgeführt

Sie können dies natürlich lokal ausführen (wie in Ihrem Beispiel), aber alle diese Phasen (außer Versand über Netzwerk) treten weiterhin auf. [Auf diese Weise können Sie Fehler bereits vor der Bereitstellung in der Produktion erkennen.]

In Ihrem zweiten Fall rufen Sie eine Methode auf, die in der Klasse testinginnerhalb der Map-Funktion definiert ist. Spark sieht das und da Methoden nicht alleine serialisiert werden können, versucht Spark, die gesamte testing Klasse zu serialisieren , damit der Code weiterhin funktioniert, wenn er in einer anderen JVM ausgeführt wird. Sie haben zwei Möglichkeiten:

Entweder machen Sie Klassentests serialisierbar, sodass die gesamte Klasse von Spark serialisiert werden kann:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

oder Sie machen eine someFuncFunktion anstelle einer Methode (Funktionen sind Objekte in Scala), damit Spark sie serialisieren kann:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

Ein ähnliches, aber nicht dasselbe Problem bei der Klassenserialisierung kann für Sie von Interesse sein, und Sie können es in dieser Präsentation zum Spark Summit 2013 nachlesen .

Als Randnotiz, können Sie umschreiben rddList.map(someFunc(_))zu rddList.map(someFunc), sie sind genau gleich. Normalerweise wird die zweite bevorzugt, da sie weniger ausführlich und sauber zu lesen ist.

BEARBEITEN (2015-03-15): SPARK-5307 führte SerializationDebugger ein und Spark 1.3.0 ist die erste Version, die es verwendet. Es fügt einer NotSerializableException einen Serialisierungspfad hinzu . Wenn eine NotSerializableException auftritt, besucht der Debugger das Objektdiagramm, um den Pfad zum Objekt zu finden, der nicht serialisiert werden kann, und erstellt Informationen, die dem Benutzer helfen, das Objekt zu finden.

Im Fall von OP wird Folgendes auf stdout gedruckt:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)
Grega Kešpret
quelle
1
Hmm, was Sie erklärt haben, macht sicherlich Sinn und erklärt, warum die gesamte Klasse serialisiert wird (etwas, das ich nicht vollständig verstanden habe). Trotzdem werde ich immer noch der Meinung sein, dass Festplatten nicht serialisierbar sind (nun, sie erweitern Serializable, aber das bedeutet nicht, dass sie keine NotSerializableException verursachen, versuchen Sie es). Aus diesem Grund wird der Fehler behoben, wenn Sie sie außerhalb von Klassen platzieren. Ich werde meine Antwort ein wenig bearbeiten, um genauer zu sagen, was ich meine - dh sie verursachen die Ausnahme, nicht dass sie die Benutzeroberfläche erweitern.
Samthebest
35
Falls Sie keine Kontrolle über die Klasse haben, müssen Sie serialisierbar sein ... Wenn Sie Scala verwenden, können Sie sie einfach mit Serializable instanziieren:val test = new Test with Serializable
Mark S
4
"rddList.map (someFunc (_)) bis rddList.map (someFunc) sind genau gleich" Nein, sie sind nicht genau gleich, und tatsächlich kann die Verwendung der letzteren Serialisierungsausnahmen verursachen, wenn die ersteren dies nicht tun würden.
Samthebest
1
@samthebest Könnten Sie bitte erklären, warum map (someFunc (_)) keine Serialisierungsausnahmen verursachen würde, während map (someFunc) dies tun würde?
Alon
31

Gregas Antwort ist großartig, um zu erklären, warum der ursprüngliche Code nicht funktioniert, und um das Problem auf zwei Arten zu beheben. Diese Lösung ist jedoch nicht sehr flexibel; Betrachten Sie den Fall, in dem Ihr Abschluss einen Methodenaufruf für eine Nichtklasse enthält Serializable, über die Sie keine Kontrolle haben. Sie können Serializabledieser Klasse weder das Tag hinzufügen noch die zugrunde liegende Implementierung ändern, um die Methode in eine Funktion umzuwandeln.

Nilesh bietet hierfür eine großartige Problemumgehung , aber die Lösung kann sowohl präziser als auch allgemeiner gestaltet werden:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

Dieser Funktions-Serializer kann dann verwendet werden, um Verschlüsse und Methodenaufrufe automatisch zu verpacken:

rdd map genMapper(someFunc)

Diese Technik hat auch den Vorteil, dass für den Zugriff keine zusätzlichen Shark-Abhängigkeiten erforderlich sind KryoSerializationWrapper, da Twitter's Chill bereits von Core Spark übernommen wird

Ben Sidhom
quelle
Hallo, ich frage mich, ob ich etwas registrieren muss, wenn ich Ihren Code verwende. Ich habe versucht, eine Ausnahme für die Klasse "Unable find" von kryo zu erhalten. THX
G_cy
25

Vollständiger Vortrag zur vollständigen Erläuterung des Problems, das einen großartigen Paradigmenwechsel vorschlägt, um diese Serialisierungsprobleme zu vermeiden: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- Lecks-no-ws.md

Die am häufigsten gewählte Antwort schlägt im Grunde vor, ein ganzes Sprachmerkmal wegzuwerfen - das heißt, es werden keine Methoden mehr verwendet und nur noch Funktionen. In der Tat sollten Methoden zur funktionalen Programmierung in Klassen vermieden werden, aber ihre Umwandlung in Funktionen löst das Designproblem hier nicht (siehe obigen Link).

Als schnelle Lösung in dieser speziellen Situation können Sie einfach die @transientAnmerkung verwenden, um anzuweisen, dass nicht versucht werden soll, den fehlerhaften Wert zu serialisieren (hier Spark.ctxhandelt es sich um eine benutzerdefinierte Klasse, die nach der Benennung von OP nicht die von Spark ist):

@transient
val rddList = Spark.ctx.parallelize(list)

Sie können den Code auch so umstrukturieren, dass rddList woanders lebt, aber das ist auch böse.

Die Zukunft ist wahrscheinlich Sporen

In Zukunft wird Scala diese Dinge enthalten, die als "Sporen" bezeichnet werden und die es uns ermöglichen sollen, die Feinkornkontrolle zu steuern, was durch einen Verschluss genau hineingezogen wird und was nicht. Darüber hinaus sollte dies alle Fehler beim versehentlichen Abrufen nicht serialisierbarer Typen (oder unerwünschter Werte) in Kompilierungsfehler verwandeln und nicht jetzt, was schreckliche Laufzeitausnahmen / Speicherlecks darstellt.

http://docs.scala-lang.org/sips/pending/spores.html

Ein Tipp zur Kryo-Serialisierung

Stellen Sie bei der Verwendung von kyro sicher, dass eine Registrierung erforderlich ist. Dies bedeutet, dass Sie Fehler anstelle von Speicherlecks erhalten:

"Schließlich weiß ich, dass kryo kryo.setRegistrationOptional (true) hat, aber es fällt mir sehr schwer, herauszufinden, wie ich es verwenden soll. Wenn diese Option aktiviert ist, scheint kryo immer noch Ausnahmen auszulösen, wenn ich mich nicht registriert habe Klassen."

Strategie zur Anmeldung von Klassen bei kryo

Dies gibt Ihnen natürlich nur eine Kontrolle auf Typebene, keine Kontrolle auf Wertebene.

... weitere Ideen folgen.

samthebest
quelle
9

Ich habe dieses Problem mit einem anderen Ansatz gelöst. Sie müssen lediglich die Objekte serialisieren, bevor Sie den Verschluss durchlaufen, und anschließend de-serialisieren. Dieser Ansatz funktioniert nur, auch wenn Ihre Klassen nicht serialisierbar sind, da Kryo hinter den Kulissen verwendet wird. Alles was Sie brauchen ist etwas Curry. ;)

Hier ist ein Beispiel, wie ich es gemacht habe:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

Fühlen Sie sich frei, Blah so kompliziert zu machen, wie Sie möchten, Klasse, Begleitobjekt, verschachtelte Klassen, Verweise auf mehrere Bibliotheken von Drittanbietern.

KryoSerializationWrapper bezieht sich auf: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Nilesh
quelle
Serialisiert dies tatsächlich die Instanz oder erstellt es eine statische Instanz und serialisiert eine Referenz (siehe meine Antwort)?
Samthebest
2
@samthebest könntest du das näher erläutern? Wenn Sie nachforschen, werden KryoSerializationWrapperSie feststellen, dass Spark denkt, dass es tatsächlich java.io.Serializable- es serialisiert das Objekt einfach intern mit Kryo - schneller und einfacher ist. Und ich glaube nicht, dass es sich um eine statische Instanz handelt - es de-serialisiert nur den Wert, wenn value.apply () aufgerufen wird.
Nilesh
8

Ich stand vor einem ähnlichen Problem, und was ich aus Gregas Antwort verstehe , ist

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

Ihre doIT- Methode versucht, eine someFunc (_) -Methode zu serialisieren. Da die Methode jedoch nicht serialisierbar ist, versucht sie, Klassentests zu serialisieren , die wiederum nicht serialisierbar sind.

Damit Ihr Code funktioniert, sollten Sie someFunc in der doIT- Methode definieren. Beispielsweise:

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

Und wenn mehrere Funktionen ins Bild kommen, sollten alle diese Funktionen für den übergeordneten Kontext verfügbar sein.

Tarang Bhalodia
quelle
7

Ich bin nicht ganz sicher, ob dies für Scala gilt, aber in Java habe ich das Problem gelöst, NotSerializableExceptionindem ich meinen Code so umgestaltet habe, dass der Abschluss nicht auf ein nicht serialisierbares finalFeld zugegriffen hat.

Trebor unhöflich
quelle
Ich habe das gleiche Problem in Java, ich versuche, FileWriter-Klasse aus Java IO-Paket in RDD foreach-Methode zu verwenden. Können Sie mir bitte mitteilen, wie wir das lösen können?
Shankar
1
Nun @Shankar, wenn das FileWriterein finalFeld der äußeren Klasse ist, kannst du es nicht tun. Aber FileWriteraus einem aufgebaut sein Stringoder eine File, die beide Serializable. Überarbeiten Sie also Ihren Code, um einen lokalen Code FileWriterbasierend auf dem Dateinamen der äußeren Klasse zu erstellen .
Trebor Rude
0

Zu Ihrer Information, in Spark 2.4 werden wahrscheinlich viele von Ihnen auf dieses Problem stoßen. Die Kryo-Serialisierung ist besser geworden, aber in vielen Fällen können Sie spark.kryo.unsafe = true oder den naiven Kryo-Serializer nicht verwenden.

Versuchen Sie für eine schnelle Lösung Folgendes in Ihrer Spark-Konfiguration zu ändern

spark.kryo.unsafe="false"

ODER

spark.serializer="org.apache.spark.serializer.JavaSerializer"

Ich ändere benutzerdefinierte RDD-Transformationen, auf die ich stoße oder die ich persönlich schreibe, indem ich explizite Broadcast-Variablen verwende und die neue integrierte Twitter-Chill-API verwende, um sie von rdd.map(row =>in rdd.mapPartitions(partition => {Funktionen zu konvertieren .

Beispiel

Alter (nicht großer) Weg

val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
    val value = sampleMap.get(row._1)
    value
})

Alternativer (besserer) Weg

import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))

rdd.mapPartitions(partition => {
    val deSerSampleMap = brdSerSampleMap.value.get
    partition.map(row => {
        val value = sampleMap.get(row._1)
        value
    }).toIterator
})

Auf diese neue Weise wird die Broadcast-Variable nur einmal pro Partition aufgerufen, was besser ist. Sie müssen weiterhin die Java-Serialisierung verwenden, wenn Sie keine Klassen registrieren.

Gabe Kirche
quelle