Aktualisieren
Diese Antwort ist immer noch gültig und informativ, obwohl die Dinge jetzt besser sind seit 2.2 / 2.3, die Encoder - Unterstützung Built-in fügt für Set
, Seq
, Map
, Date
, Timestamp
, und BigDecimal
. Wenn Sie sich daran halten, Typen nur mit Fallklassen und den üblichen Scala-Typen zu erstellen, sollten Sie nur mit dem impliziten in einverstanden sein SQLImplicits
.
Leider wurde praktisch nichts hinzugefügt, um dies zu unterstützen. Die Suche nach @since 2.0.0
in Encoders.scala
oder SQLImplicits.scala
Funden Dinge meist mit primitiven Typen (und einige Optimierungen von Fallklassen) zu tun. Als erstes zu sagen: Derzeit gibt es keine wirklich gute Unterstützung für benutzerdefinierte Klassencodierer . Nachdem dies aus dem Weg geräumt ist, folgen einige Tricks, die so gut funktionieren, wie wir es uns je erhoffen können, wenn man bedenkt, was uns derzeit zur Verfügung steht. Als Vorab-Haftungsausschluss: Dies funktioniert nicht perfekt und ich werde mein Bestes tun, um alle Einschränkungen klar und deutlich zu machen.
Was genau ist das Problem
Wenn Sie ein Dataset erstellen möchten, benötigt Spark "einen Encoder (um ein JVM-Objekt vom Typ T in und aus der internen Spark-SQL-Darstellung zu konvertieren), der im Allgemeinen automatisch durch Implikationen von a erstellt SparkSession
wird oder explizit durch Aufrufen statischer Methoden erstellt werden kann on Encoders
"(entnommen aus den Dokumenten oncreateDataset
). Ein Encoder hat die Form, Encoder[T]
in der T
sich der Typ befindet, den Sie codieren. Der erste Vorschlag ist das Hinzufügen import spark.implicits._
(was Ihnen diese impliziten Encoder gibt) und der zweite Vorschlag ist das explizite Übergeben des impliziten Encoders unter Verwendung dieses Satzes von Encoder-bezogenen Funktionen.
Für reguläre Klassen steht also kein Encoder zur Verfügung
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
gibt Ihnen den folgenden impliziten Fehler bei der Kompilierung:
Encoder für Typ, der in einem Datensatz gespeichert ist, kann nicht gefunden werden. Primitive Typen (Int, String usw.) und Produkttypen (Fallklassen) werden durch den Import von sqlContext.implicits unterstützt. Die Unterstützung für die Serialisierung anderer Typen wird in zukünftigen Versionen hinzugefügt
Wenn Sie jedoch den Typ einschließen, den Sie gerade verwendet haben, um den obigen Fehler in einer erweiterten Klasse zu erhalten, Product
wird der Fehler verwirrenderweise auf die Laufzeit verzögert
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Kompiliert einwandfrei, schlägt aber zur Laufzeit mit fehl
java.lang.UnsupportedOperationException: Für MyObj wurde kein Encoder gefunden
Der Grund dafür ist, dass die Encoder, die Spark mit den Implicits erstellt, tatsächlich nur zur Laufzeit (über Scala Relfection) erstellt werden. In diesem Fall besteht alle Spark-Überprüfungen zur Kompilierungszeit darin, dass die äußerste Klasse erweitert wird Product
(was bei allen Fallklassen der Fall ist) und erst zur Laufzeit feststellt, dass sie immer noch nicht weiß, was sie tun sollen MyObj
(dasselbe Problem tritt auf, wenn ich versucht habe, dies zu tun a Dataset[(Int,MyObj)]
- Spark wartet bis zur Laufzeit, bis er aktiviert ist MyObj
. Dies sind zentrale Probleme, die dringend behoben werden müssen:
- Einige Klassen, die die
Product
Kompilierung erweitern , obwohl sie zur Laufzeit immer abstürzen und
- Es gibt keine Möglichkeit, benutzerdefinierte Encoder für verschachtelte Typen zu übergeben (ich habe keine Möglichkeit, Spark einen Encoder nur
MyObj
so zuzuführen, dass er dann weiß, wie man codiert Wrap[MyObj]
oder (Int,MyObj)
).
Benutz einfach kryo
Die Lösung, die jeder vorschlägt, ist die Verwendung des kryo
Encoders.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Dies wird jedoch ziemlich schnell langweilig. Vor allem, wenn Ihr Code alle Arten von Datensätzen manipuliert, zusammenfügt, gruppiert usw. Sie haben am Ende eine Reihe zusätzlicher Implikationen. Warum also nicht einfach ein Implizit machen, das dies alles automatisch erledigt?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
Und jetzt kann ich anscheinend fast alles tun, was ich will (das folgende Beispiel funktioniert nicht, spark-shell
wenn spark.implicits._
es automatisch importiert wird).
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
Oder fast. Das Problem ist, dass die Verwendung dazu kryo
führt, dass Spark nur jede Zeile im Dataset als flaches binäres Objekt speichert. Für map
, filter
, foreach
die genug ist, aber für Operationen wie join
Spark wirklich braucht diese in Spalten getrennt werden. Wenn Sie das Schema auf d2
oder untersuchen d3
, sehen Sie, dass es nur eine Binärspalte gibt:
d2.printSchema
// root
// |-- value: binary (nullable = true)
Teillösung für Tupel
Mit der Magie der Implikits in Scala (mehr in 6.26.3 Überladen der Auflösung ) kann ich mir eine Reihe von Implikits erstellen , die zumindest für Tupel so gute Arbeit wie möglich leisten und mit vorhandenen Implikits gut funktionieren:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
Mit diesen Implikationen kann ich dann mein Beispiel oben zum Laufen bringen, wenn auch mit einigen Spaltenumbenennungen
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
Ich habe noch nicht herausgefunden, wie die erwarteten Tupel Namen zu erhalten ( _1
, _2
, ...) standardmäßig ohne sie zu umbenennen - wenn jemand anderes mit diesem spielen , um will, das ist , wo der Name "value"
eingeführt wird und dies ist , wo das Tupel Namen werden normalerweise hinzugefügt. Der entscheidende Punkt ist jedoch, dass ich jetzt ein schön strukturiertes Schema habe:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
Zusammenfassend diese Problemumgehung:
- ermöglicht es uns, separate Spalten für Tupel zu erhalten (damit wir uns wieder Tupeln anschließen können, yay!)
- Wir können uns wieder nur auf die Implikationen verlassen (es ist also nicht nötig, überall vorbeizukommen
kryo
).
- ist fast vollständig abwärtskompatibel mit
import spark.implicits._
(mit einigen Umbenennungen)
- ist nicht lassen Sie uns auf die Join -
kyro
serialisiert binären Spalten, geschweige denn auf den Feldern diejenigen haben
- hat den unangenehmen Nebeneffekt, dass einige der Tupelspalten in "Wert" umbenannt werden (falls erforderlich, kann dies durch Konvertieren
.toDF
, Angeben neuer Spaltennamen und Zurückkonvertieren in ein Dataset rückgängig gemacht werden - und die Schemanamen scheinen durch Verknüpfungen erhalten zu bleiben , wo sie am dringendsten gebraucht werden).
Teillösung für Klassen im Allgemeinen
Dieser ist weniger angenehm und hat keine gute Lösung. Jetzt, da wir die obige Tupellösung haben, habe ich die Vermutung, dass die implizite Konvertierungslösung aus einer anderen Antwort auch etwas weniger schmerzhaft sein wird, da Sie Ihre komplexeren Klassen in Tupel konvertieren können. Nach dem Erstellen des Datasets würden Sie die Spalten wahrscheinlich mithilfe des Datenrahmenansatzes umbenennen. Wenn alles gut geht, ist dies wirklich eine Verbesserung, da ich jetzt Joins auf den Feldern meiner Klassen durchführen kann. Wenn ich nur einen flachen binären kryo
Serializer verwendet hätte, wäre das nicht möglich gewesen.
Hier ist ein Beispiel, das ein bisschen von allem macht: Ich habe eine Klasse, MyObj
die Felder von Typen Int
hat java.util.UUID
, und Set[String]
. Der erste kümmert sich um sich. Die zweite, obwohl ich mit serialisieren könnte, kryo
wäre nützlicher, wenn sie als gespeichert würde String
(da UUID
s normalerweise etwas sind, gegen das ich mich anschließen möchte). Der dritte gehört wirklich nur in eine binäre Spalte.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Jetzt kann ich mit dieser Maschine einen Datensatz mit einem schönen Schema erstellen:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
Und das Schema zeigt mir Spalten mit den richtigen Namen und mit den ersten beiden Dingen, gegen die ich mich anschließen kann.
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)
ExpressionEncoder
mithilfe der JSON-Serialisierung zu erstellen ? In meinem Fall kann ich nicht mit Tupeln davonkommen, und Kryo gibt mir eine binäre Spalte.Verwendung generischer Encoder.
Es gibt zwei allgemeine verfügbaren Encoder für jetzt
kryo
undjavaSerialization
wo letzteres ausdrücklich beschrieben wird:Angenommen, folgende Klasse
Sie können diese Encoder verwenden, indem Sie einen impliziten Encoder hinzufügen:
die wie folgt zusammen verwendet werden können:
Es speichert Objekte als
binary
Spalte, sodass Sie bei der KonvertierungDataFrame
das folgende Schema erhalten:Es ist auch möglich, Tupel mit einem
kryo
Encoder für ein bestimmtes Feld zu codieren :Bitte beachten Sie, dass wir hier nicht von impliziten Encodern abhängig sind, sondern den Encoder explizit übergeben, sodass dies höchstwahrscheinlich nicht mit der
toDS
Methode funktioniert .Verwenden impliziter Konvertierungen:
Stellen Sie implizite Konvertierungen zwischen einer codierbaren Darstellung und einer benutzerdefinierten Klasse bereit, z. B.:
Verwandte Fragen:
quelle
Set
) für typisierte Sammlungen, die ich bekomme, nicht zu funktionierenException in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar]
.kryo[Set[Bar]]
. Wenn eine Klasse ein Feld enthältBar
, benötigen Sie einen Encoder für ein ganzes Objekt. Dies sind sehr grobe Methoden.Bar
, benötigen Sie einen Encoder für ein ganzes Objekt." Meine Frage war, wie man dieses "ganze Projekt" verschlüsselt.Sie können UDTRegistration verwenden und dann Fallklassen, Tupel usw. funktionieren alle korrekt mit Ihrem benutzerdefinierten Typ!
Angenommen, Sie möchten eine benutzerdefinierte Aufzählung verwenden:
Registrieren Sie es so:
Dann BENUTZEN SIE ES!
Angenommen, Sie möchten einen polymorphen Datensatz verwenden:
... und die Verwendung so:
Sie können eine benutzerdefinierte UDT schreiben, die alles in Bytes codiert (ich verwende hier die Java-Serialisierung, aber es ist wahrscheinlich besser, den Kryo-Kontext von Spark zu instrumentieren).
Definieren Sie zuerst die UDT-Klasse:
Dann registrieren Sie es:
Dann können Sie es benutzen!
quelle
Encoder funktionieren in mehr oder weniger gleich
Spark2.0
. UndKryo
ist immer noch die empfohleneserialization
Wahl.Sie können das folgende Beispiel mit Spark-Shell betrachten
Bis jetzt gab es keinen
appropriate encoders
gegenwärtigen Umfang, so dass unsere Personen nicht alsbinary
Werte verschlüsselt wurden . Dies wird sich jedoch ändern, sobald wir einigeimplicit
Encoder mitKryo
Serialisierung bereitstellen .quelle
Im Fall der Java Bean-Klasse kann dies nützlich sein
Jetzt können Sie den Datenrahmen einfach als benutzerdefinierten Datenrahmen lesen
Dadurch wird ein benutzerdefinierter Klassencodierer erstellt und kein binärer.
quelle
Meine Beispiele werden in Java sein, aber ich kann mir nicht vorstellen, dass es schwierig ist, sich an Scala anzupassen.
Ich habe recht erfolgreich Umwandlung
RDD<Fruit>
zuDataset<Fruit>
verwenden spark.createDataset und Encoders.bean solangeFruit
ein einfaches Java Bean .Schritt 1: Erstellen Sie die einfache Java Bean.
Ich würde mich an Klassen mit primitiven Typen und String als Felder halten, bevor die DataBricks-Leute ihre Encoder verbessern. Wenn Sie eine Klasse mit verschachteltem Objekt haben, erstellen Sie eine weitere einfache Java Bean mit allen abgeflachten Feldern, damit Sie den komplexen Typ mithilfe von RDD-Transformationen dem einfacheren zuordnen können. Sicher, es ist ein wenig zusätzliche Arbeit, aber ich kann mir vorstellen, dass es bei der Arbeit mit einem flachen Schema sehr hilfreich sein wird.
Schritt 2: Holen Sie sich Ihren Datensatz vom RDD
Und voila! Aufschäumen, ausspülen, wiederholen.
quelle
Für diejenigen, die in meiner Situation mögen, habe ich auch hier meine Antwort gestellt.
Um genau zu sein,
Ich habe 'Setze typisierte Daten' aus SQLContext gelesen. Das ursprüngliche Datenformat ist also DataFrame.
val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()
+---+---+ | a| b| +---+---+ | 1|[1]| +---+---+
Konvertieren Sie es dann mit rdd.map () mit dem Typ mutable.WrappedArray in RDD.
sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)
Ergebnis:
(1,Set(1))
quelle
Zusätzlich zu den bereits gegebenen Vorschlägen habe ich kürzlich festgestellt, dass Sie Ihre benutzerdefinierte Klasse einschließlich des Merkmals deklarieren können
org.apache.spark.sql.catalyst.DefinedByConstructorParams
.Dies funktioniert, wenn die Klasse über einen Konstruktor verfügt, der Typen verwendet, die der ExpressionEncoder verstehen kann, dh primitive Werte und Standardsammlungen. Dies kann nützlich sein, wenn Sie die Klasse nicht als Fallklasse deklarieren können, sie jedoch nicht jedes Mal mit Kryo codieren möchten, wenn sie in einem Datensatz enthalten ist.
Zum Beispiel wollte ich eine Fallklasse deklarieren, die einen Breeze-Vektor enthält. Der einzige Encoder, der das handhaben könnte, wäre normalerweise Kryo. Wenn ich jedoch eine Unterklasse deklarierte, die Breeze DenseVector und DefinedByConstructorParams erweiterte, verstand der ExpressionEncoder, dass sie als Array von Doubles serialisiert werden kann.
So habe ich es erklärt:
Jetzt kann ich
SerializableDenseVector
in einem Datensatz (direkt oder als Teil eines Produkts) einen einfachen ExpressionEncoder und kein Kryo verwenden. Es funktioniert genau wie ein Breeze DenseVector, wird jedoch als Array [Double] serialisiert.quelle