Wie speichere ich benutzerdefinierte Objekte in Dataset?

149

Laut Introducing Spark Datasets :

Da wir uns auf Spark 2.0 freuen, planen wir einige aufregende Verbesserungen an Datensätzen, insbesondere: ... Benutzerdefinierte Encoder - Während wir derzeit Encoder für eine Vielzahl von Typen automatisch generieren, möchten wir eine API für benutzerdefinierte Objekte öffnen.

und versucht, einen benutzerdefinierten Typ in einem Datasetfolgenden Fehler zu speichern :

Encoder für Typ, der in einem Datensatz gespeichert ist, kann nicht gefunden werden. Primitive Typen (Int, String usw.) und Produkttypen (Fallklassen) werden durch den Import von sqlContext.implicits unterstützt. Die Unterstützung für die Serialisierung anderer Typen wird in zukünftigen Versionen hinzugefügt

oder:

Java.lang.UnsupportedOperationException: Kein Encoder gefunden für ....

Gibt es Problemumgehungen?


Beachten Sie, dass diese Frage nur als Einstiegspunkt für eine Community-Wiki-Antwort dient. Fühlen Sie sich frei, sowohl Frage als auch Antwort zu aktualisieren / zu verbessern.

null323
quelle

Antworten:

240

Aktualisieren

Diese Antwort ist immer noch gültig und informativ, obwohl die Dinge jetzt besser sind seit 2.2 / 2.3, die Encoder - Unterstützung Built-in fügt für Set, Seq, Map, Date, Timestamp, und BigDecimal. Wenn Sie sich daran halten, Typen nur mit Fallklassen und den üblichen Scala-Typen zu erstellen, sollten Sie nur mit dem impliziten in einverstanden sein SQLImplicits.


Leider wurde praktisch nichts hinzugefügt, um dies zu unterstützen. Die Suche nach @since 2.0.0in Encoders.scalaoder SQLImplicits.scalaFunden Dinge meist mit primitiven Typen (und einige Optimierungen von Fallklassen) zu tun. Als erstes zu sagen: Derzeit gibt es keine wirklich gute Unterstützung für benutzerdefinierte Klassencodierer . Nachdem dies aus dem Weg geräumt ist, folgen einige Tricks, die so gut funktionieren, wie wir es uns je erhoffen können, wenn man bedenkt, was uns derzeit zur Verfügung steht. Als Vorab-Haftungsausschluss: Dies funktioniert nicht perfekt und ich werde mein Bestes tun, um alle Einschränkungen klar und deutlich zu machen.

Was genau ist das Problem

Wenn Sie ein Dataset erstellen möchten, benötigt Spark "einen Encoder (um ein JVM-Objekt vom Typ T in und aus der internen Spark-SQL-Darstellung zu konvertieren), der im Allgemeinen automatisch durch Implikationen von a erstellt SparkSessionwird oder explizit durch Aufrufen statischer Methoden erstellt werden kann on Encoders"(entnommen aus den Dokumenten oncreateDataset ). Ein Encoder hat die Form, Encoder[T]in der Tsich der Typ befindet, den Sie codieren. Der erste Vorschlag ist das Hinzufügen import spark.implicits._(was Ihnen diese impliziten Encoder gibt) und der zweite Vorschlag ist das explizite Übergeben des impliziten Encoders unter Verwendung dieses Satzes von Encoder-bezogenen Funktionen.

Für reguläre Klassen steht also kein Encoder zur Verfügung

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

gibt Ihnen den folgenden impliziten Fehler bei der Kompilierung:

Encoder für Typ, der in einem Datensatz gespeichert ist, kann nicht gefunden werden. Primitive Typen (Int, String usw.) und Produkttypen (Fallklassen) werden durch den Import von sqlContext.implicits unterstützt. Die Unterstützung für die Serialisierung anderer Typen wird in zukünftigen Versionen hinzugefügt

Wenn Sie jedoch den Typ einschließen, den Sie gerade verwendet haben, um den obigen Fehler in einer erweiterten Klasse zu erhalten, Productwird der Fehler verwirrenderweise auf die Laufzeit verzögert

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

Kompiliert einwandfrei, schlägt aber zur Laufzeit mit fehl

java.lang.UnsupportedOperationException: Für MyObj wurde kein Encoder gefunden

Der Grund dafür ist, dass die Encoder, die Spark mit den Implicits erstellt, tatsächlich nur zur Laufzeit (über Scala Relfection) erstellt werden. In diesem Fall besteht alle Spark-Überprüfungen zur Kompilierungszeit darin, dass die äußerste Klasse erweitert wird Product(was bei allen Fallklassen der Fall ist) und erst zur Laufzeit feststellt, dass sie immer noch nicht weiß, was sie tun sollen MyObj(dasselbe Problem tritt auf, wenn ich versucht habe, dies zu tun a Dataset[(Int,MyObj)]- Spark wartet bis zur Laufzeit, bis er aktiviert ist MyObj. Dies sind zentrale Probleme, die dringend behoben werden müssen:

  • Einige Klassen, die die ProductKompilierung erweitern , obwohl sie zur Laufzeit immer abstürzen und
  • Es gibt keine Möglichkeit, benutzerdefinierte Encoder für verschachtelte Typen zu übergeben (ich habe keine Möglichkeit, Spark einen Encoder nur MyObjso zuzuführen, dass er dann weiß, wie man codiert Wrap[MyObj]oder (Int,MyObj)).

Benutz einfach kryo

Die Lösung, die jeder vorschlägt, ist die Verwendung des kryoEncoders.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

Dies wird jedoch ziemlich schnell langweilig. Vor allem, wenn Ihr Code alle Arten von Datensätzen manipuliert, zusammenfügt, gruppiert usw. Sie haben am Ende eine Reihe zusätzlicher Implikationen. Warum also nicht einfach ein Implizit machen, das dies alles automatisch erledigt?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

Und jetzt kann ich anscheinend fast alles tun, was ich will (das folgende Beispiel funktioniert nicht, spark-shellwenn spark.implicits._es automatisch importiert wird).

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

Oder fast. Das Problem ist, dass die Verwendung dazu kryoführt, dass Spark nur jede Zeile im Dataset als flaches binäres Objekt speichert. Für map, filter, foreachdie genug ist, aber für Operationen wie joinSpark wirklich braucht diese in Spalten getrennt werden. Wenn Sie das Schema auf d2oder untersuchen d3, sehen Sie, dass es nur eine Binärspalte gibt:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

Teillösung für Tupel

Mit der Magie der Implikits in Scala (mehr in 6.26.3 Überladen der Auflösung ) kann ich mir eine Reihe von Implikits erstellen , die zumindest für Tupel so gute Arbeit wie möglich leisten und mit vorhandenen Implikits gut funktionieren:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

Mit diesen Implikationen kann ich dann mein Beispiel oben zum Laufen bringen, wenn auch mit einigen Spaltenumbenennungen

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

Ich habe noch nicht herausgefunden, wie die erwarteten Tupel Namen zu erhalten ( _1, _2, ...) standardmäßig ohne sie zu umbenennen - wenn jemand anderes mit diesem spielen , um will, das ist , wo der Name "value"eingeführt wird und dies ist , wo das Tupel Namen werden normalerweise hinzugefügt. Der entscheidende Punkt ist jedoch, dass ich jetzt ein schön strukturiertes Schema habe:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

Zusammenfassend diese Problemumgehung:

  • ermöglicht es uns, separate Spalten für Tupel zu erhalten (damit wir uns wieder Tupeln anschließen können, yay!)
  • Wir können uns wieder nur auf die Implikationen verlassen (es ist also nicht nötig, überall vorbeizukommen kryo).
  • ist fast vollständig abwärtskompatibel mit import spark.implicits._(mit einigen Umbenennungen)
  • ist nicht lassen Sie uns auf die Join - kyroserialisiert binären Spalten, geschweige denn auf den Feldern diejenigen haben
  • hat den unangenehmen Nebeneffekt, dass einige der Tupelspalten in "Wert" umbenannt werden (falls erforderlich, kann dies durch Konvertieren .toDF, Angeben neuer Spaltennamen und Zurückkonvertieren in ein Dataset rückgängig gemacht werden - und die Schemanamen scheinen durch Verknüpfungen erhalten zu bleiben , wo sie am dringendsten gebraucht werden).

Teillösung für Klassen im Allgemeinen

Dieser ist weniger angenehm und hat keine gute Lösung. Jetzt, da wir die obige Tupellösung haben, habe ich die Vermutung, dass die implizite Konvertierungslösung aus einer anderen Antwort auch etwas weniger schmerzhaft sein wird, da Sie Ihre komplexeren Klassen in Tupel konvertieren können. Nach dem Erstellen des Datasets würden Sie die Spalten wahrscheinlich mithilfe des Datenrahmenansatzes umbenennen. Wenn alles gut geht, ist dies wirklich eine Verbesserung, da ich jetzt Joins auf den Feldern meiner Klassen durchführen kann. Wenn ich nur einen flachen binären kryoSerializer verwendet hätte, wäre das nicht möglich gewesen.

Hier ist ein Beispiel, das ein bisschen von allem macht: Ich habe eine Klasse, MyObjdie Felder von Typen Inthat java.util.UUID, und Set[String]. Der erste kümmert sich um sich. Die zweite, obwohl ich mit serialisieren könnte, kryowäre nützlicher, wenn sie als gespeichert würde String(da UUIDs normalerweise etwas sind, gegen das ich mich anschließen möchte). Der dritte gehört wirklich nur in eine binäre Spalte.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

Jetzt kann ich mit dieser Maschine einen Datensatz mit einem schönen Schema erstellen:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

Und das Schema zeigt mir Spalten mit den richtigen Namen und mit den ersten beiden Dingen, gegen die ich mich anschließen kann.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)
Alec
quelle
Ist es möglich, eine benutzerdefinierte Klasse ExpressionEncodermithilfe der JSON-Serialisierung zu erstellen ? In meinem Fall kann ich nicht mit Tupeln davonkommen, und Kryo gibt mir eine binäre Spalte.
Alexey Svyatkovskiy
1
@AlexeyS Das glaube ich nicht. Aber warum willst du das? Warum können Sie nicht mit der letzten Lösung davonkommen, die ich vorschlage? Wenn Sie Ihre Daten in JSON einfügen können, sollten Sie in der Lage sein, die Felder zu extrahieren und in eine
Alec
1
Leider lautet das Fazit dieser Antwort, dass es keine funktionierende Lösung gibt.
Baol
@baol Art von. Aber denken Sie daran, wie schwierig es ist, was Spark tut. Das Typensystem von Scala ist einfach nicht leistungsfähig genug, um Encoder abzuleiten, die rekursiv durch Felder gehen. Ehrlich gesagt bin ich nur überrascht, dass niemand ein Anmerkungsmakro dafür erstellt hat. Scheint wie die natürliche (aber schwierige) Lösung.
Alec
1
@combinatorist Mein Verständnis ist, dass Datasets und Dataframes (aber keine RDDs, da sie keine Encoder benötigen!) aus Sicht der Leistung gleichwertig sind. Unterschätzen Sie nicht die Typensicherheit von Datensätzen! Nur weil Spark intern eine Menge Reflexionen, Casts usw. verwendet, bedeutet dies nicht, dass Sie sich nicht um die Typensicherheit der exponierten Schnittstelle kümmern sollten. Aber ich fühle mich besser, wenn ich meine eigenen datensatzbasierten typsicheren Funktionen erstelle, die Dataframes unter der Haube verwenden.
Alec
32
  1. Verwendung generischer Encoder.

    Es gibt zwei allgemeine verfügbaren Encoder für jetzt kryound javaSerializationwo letzteres ausdrücklich beschrieben wird:

    extrem ineffizient und sollte nur als letzter Ausweg verwendet werden.

    Angenommen, folgende Klasse

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }

    Sie können diese Encoder verwenden, indem Sie einen impliziten Encoder hinzufügen:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }

    die wie folgt zusammen verwendet werden können:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }

    Es speichert Objekte als binarySpalte, sodass Sie bei der Konvertierung DataFramedas folgende Schema erhalten:

    root
     |-- value: binary (nullable = true)

    Es ist auch möglich, Tupel mit einem kryoEncoder für ein bestimmtes Feld zu codieren :

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]

    Bitte beachten Sie, dass wir hier nicht von impliziten Encodern abhängig sind, sondern den Encoder explizit übergeben, sodass dies höchstwahrscheinlich nicht mit der toDSMethode funktioniert .

  2. Verwenden impliziter Konvertierungen:

    Stellen Sie implizite Konvertierungen zwischen einer codierbaren Darstellung und einer benutzerdefinierten Klasse bereit, z. B.:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }

Verwandte Fragen:

Null323
quelle
Lösung 1 scheint (zumindest Set) für typisierte Sammlungen, die ich bekomme, nicht zu funktionieren Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar].
Victor P.
@ VictorP. Es wird erwartet, dass ich befürchte. In diesem Fall benötigen Sie einen Encoder für einen bestimmten Typ ( kryo[Set[Bar]]. Wenn eine Klasse ein Feld enthält Bar, benötigen Sie einen Encoder für ein ganzes Objekt. Dies sind sehr grobe Methoden.
zero323
@ zero323 Ich stehe vor dem gleichen Problem. Können Sie ein Codebeispiel für die Codierung des gesamten Projekts einfügen? Danke vielmals!
Rock
@ Rock Ich bin nicht sicher, was du mit "ganzes Projekt"
meinst
@ zero323 pro Kommentar: "Wenn die Klasse ein Feld enthält Bar, benötigen Sie einen Encoder für ein ganzes Objekt." Meine Frage war, wie man dieses "ganze Projekt" verschlüsselt.
Rock
9

Sie können UDTRegistration verwenden und dann Fallklassen, Tupel usw. funktionieren alle korrekt mit Ihrem benutzerdefinierten Typ!

Angenommen, Sie möchten eine benutzerdefinierte Aufzählung verwenden:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

Registrieren Sie es so:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

Dann BENUTZEN SIE ES!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

Angenommen, Sie möchten einen polymorphen Datensatz verwenden:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... und die Verwendung so:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Sie können eine benutzerdefinierte UDT schreiben, die alles in Bytes codiert (ich verwende hier die Java-Serialisierung, aber es ist wahrscheinlich besser, den Kryo-Kontext von Spark zu instrumentieren).

Definieren Sie zuerst die UDT-Klasse:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Dann registrieren Sie es:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

Dann können Sie es benutzen!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()
ChoppyTheLumberjack
quelle
1
Ich sehe nicht, wo Ihr Kryo verwendet wird (in CustomPolyUDT)
Mathieu
Ich versuche, eine UDT in meinem Projekt zu definieren, und erhalte die Fehlermeldung "Auf das Symbol UserDefinedType kann von diesem Ort aus nicht zugegriffen werden". Irgendeine Hilfe ?
Rijo Joseph
Hallo @RijoJoseph. Sie müssen ein Paket org.apache.spark in Ihrem Projekt erstellen und Ihren UDT-Code darin einfügen.
ChoppyTheLumberjack
6

Encoder funktionieren in mehr oder weniger gleich Spark2.0. Und Kryoist immer noch die empfohlene serializationWahl.

Sie können das folgende Beispiel mit Spark-Shell betrachten

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

Bis jetzt gab es keinen appropriate encodersgegenwärtigen Umfang, so dass unsere Personen nicht als binaryWerte verschlüsselt wurden . Dies wird sich jedoch ändern, sobald wir einige implicitEncoder mit KryoSerialisierung bereitstellen .

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.
sarveshseri
quelle
3

Im Fall der Java Bean-Klasse kann dies nützlich sein

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

Jetzt können Sie den Datenrahmen einfach als benutzerdefinierten Datenrahmen lesen

dataFrame.as[MyClass]

Dadurch wird ein benutzerdefinierter Klassencodierer erstellt und kein binärer.

Akash Mahajan
quelle
1

Meine Beispiele werden in Java sein, aber ich kann mir nicht vorstellen, dass es schwierig ist, sich an Scala anzupassen.

Ich habe recht erfolgreich Umwandlung RDD<Fruit>zu Dataset<Fruit>verwenden spark.createDataset und Encoders.bean solange Fruitein einfaches Java Bean .

Schritt 1: Erstellen Sie die einfache Java Bean.

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

Ich würde mich an Klassen mit primitiven Typen und String als Felder halten, bevor die DataBricks-Leute ihre Encoder verbessern. Wenn Sie eine Klasse mit verschachteltem Objekt haben, erstellen Sie eine weitere einfache Java Bean mit allen abgeflachten Feldern, damit Sie den komplexen Typ mithilfe von RDD-Transformationen dem einfacheren zuordnen können. Sicher, es ist ein wenig zusätzliche Arbeit, aber ich kann mir vorstellen, dass es bei der Arbeit mit einem flachen Schema sehr hilfreich sein wird.

Schritt 2: Holen Sie sich Ihren Datensatz vom RDD

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

Und voila! Aufschäumen, ausspülen, wiederholen.

Jimmy Da
quelle
Ich würde vorschlagen, darauf hinzuweisen, dass es für einfache Strukturen besser ist, sie in nativen Spark-Typen zu speichern, als sie zu einem Blob zu serialisieren. Sie funktionieren besser über das Python-Gateway, sind in Parkett transparenter und können sogar auf Strukturen derselben Form übertragen werden.
Metasim
1

Für diejenigen, die in meiner Situation mögen, habe ich auch hier meine Antwort gestellt.

Um genau zu sein,

  1. Ich habe 'Setze typisierte Daten' aus SQLContext gelesen. Das ursprüngliche Datenformat ist also DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. Konvertieren Sie es dann mit rdd.map () mit dem Typ mutable.WrappedArray in RDD.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    Ergebnis:

    (1,Set(1))

Taeheon Kwon
quelle
0

Zusätzlich zu den bereits gegebenen Vorschlägen habe ich kürzlich festgestellt, dass Sie Ihre benutzerdefinierte Klasse einschließlich des Merkmals deklarieren können org.apache.spark.sql.catalyst.DefinedByConstructorParams .

Dies funktioniert, wenn die Klasse über einen Konstruktor verfügt, der Typen verwendet, die der ExpressionEncoder verstehen kann, dh primitive Werte und Standardsammlungen. Dies kann nützlich sein, wenn Sie die Klasse nicht als Fallklasse deklarieren können, sie jedoch nicht jedes Mal mit Kryo codieren möchten, wenn sie in einem Datensatz enthalten ist.

Zum Beispiel wollte ich eine Fallklasse deklarieren, die einen Breeze-Vektor enthält. Der einzige Encoder, der das handhaben könnte, wäre normalerweise Kryo. Wenn ich jedoch eine Unterklasse deklarierte, die Breeze DenseVector und DefinedByConstructorParams erweiterte, verstand der ExpressionEncoder, dass sie als Array von Doubles serialisiert werden kann.

So habe ich es erklärt:

class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]

Jetzt kann ich SerializableDenseVectorin einem Datensatz (direkt oder als Teil eines Produkts) einen einfachen ExpressionEncoder und kein Kryo verwenden. Es funktioniert genau wie ein Breeze DenseVector, wird jedoch als Array [Double] serialisiert.

Matt
quelle