Wie kann ich eine RDD konvertieren ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
) zu einem Datenrahmen org.apache.spark.sql.DataFrame
. Ich habe einen Datenrahmen mit rdd konvertiert .rdd
. Nach der Verarbeitung möchte ich es wieder im Datenrahmen haben. Wie kann ich das machen ?
scala
apache-spark
apache-spark-sql
rdd
user568109
quelle
quelle
Antworten:
SqlContext
hat eine Reihe voncreateDataFrame
Methoden, die eineDataFrame
bestimmte an erstellenRDD
. Ich kann mir vorstellen, dass eine davon für Ihren Kontext geeignet ist.Beispielsweise:
quelle
Dieser Code funktioniert perfekt ab Spark 2.x mit Scala 2.11
Importieren Sie die erforderlichen Klassen
SparkSession
Objekt erstellen und hier ist esspark
Lass es uns
RDD
schaffenDataFrame
Methode 1
Verwenden von
SparkSession.createDataFrame(RDD obj)
.Methode 2
Die Verwendung
SparkSession.createDataFrame(RDD obj)
und Spaltennamen angeben.Methode 3 (Aktuelle Antwort auf die Frage)
Auf diese Weise muss die Eingabe
rdd
vom Typ seinRDD[Row]
.Erstellen Sie das Schema
Wenden Sie nun beide
rowsRdd
undschema
ancreateDataFrame()
quelle
Angenommen, Ihre RDD [Zeile] heißt rdd, können Sie Folgendes verwenden:
quelle
Hinweis: Diese Antwort wurde ursprünglich hier veröffentlicht
Ich poste diese Antwort, weil ich zusätzliche Details zu den verfügbaren Optionen mitteilen möchte, die ich in den anderen Antworten nicht gefunden habe
Um einen DataFrame aus einer RDD von Zeilen zu erstellen, gibt es zwei Hauptoptionen:
1) Wie bereits erwähnt, können Sie verwenden,
toDF()
welche von importiert werden könnenimport sqlContext.implicits._
. Dieser Ansatz funktioniert jedoch nur für die folgenden Arten von RDDs:RDD[Int]
RDD[Long]
RDD[String]
RDD[T <: scala.Product]
(Quelle: Scaladoc des
SQLContext.implicits
Objekts)Die letzte Signatur bedeutet tatsächlich, dass sie für eine RDD von Tupeln oder eine RDD von Fallklassen funktionieren kann (da Tupel und Fallklassen Unterklassen von sind
scala.Product
).Um diesen Ansatz für ein zu verwenden
RDD[Row]
, müssen Sie ihn einem zuordnenRDD[T <: scala.Product]
. Dies kann erreicht werden, indem jede Zeile einer benutzerdefinierten Fallklasse oder einem Tupel zugeordnet wird, wie in den folgenden Codeausschnitten:oder
Der Hauptnachteil dieses Ansatzes (meiner Meinung nach) besteht darin, dass Sie das Schema des resultierenden DataFrame in der Kartenfunktion spaltenweise explizit festlegen müssen. Vielleicht kann dies programmgesteuert erfolgen, wenn Sie das Schema nicht im Voraus kennen, aber dort kann es etwas chaotisch werden. Alternativ gibt es also eine andere Option:
2) Sie können
createDataFrame(rowRDD: RDD[Row], schema: StructType)
wie in der akzeptierten Antwort verwenden, die im SQLContext- Objekt verfügbar ist . Beispiel für die Konvertierung einer RDD eines alten DataFrame:Beachten Sie, dass keine Schemaspalte explizit festgelegt werden muss. Wir verwenden das Schema des alten DF wieder, das von
StructType
Klasse ist und leicht erweitert werden kann. Dieser Ansatz ist jedoch manchmal nicht möglich und kann in einigen Fällen weniger effizient sein als der erste.quelle
import sqlContext.implicits.
Angenommen, Sie haben eine
DataFrame
und möchten einige Änderungen an den Felddaten vornehmen, indem Sie diese in konvertierenRDD[Row]
.Um zurück
DataFrame
von zu konvertieren, müssenRDD
wir den Strukturtyp des definierenRDD
.Wenn der Datentyp war
Long
, wird er wieLongType
in der Struktur.Wenn
String
dannStringType
in Struktur.Jetzt können Sie die RDD mithilfe der Methode createDataFrame in DataFrame konvertieren .
quelle
Hier ist ein einfaches Beispiel für die Konvertierung Ihrer Liste in Spark RDD und die anschließende Konvertierung dieser Spark RDD in Dataframe.
Bitte beachten Sie, dass ich die scala REPL von Spark-Shell verwendet habe, um den folgenden Code auszuführen. Hier ist sc eine Instanz von SparkContext, die implizit in Spark-Shell verfügbar ist. Hoffe es beantwortet deine Frage.
quelle
Methode 1: (Scala)
Methode 2: (Scala)
Methode 1: (Python)
Methode 2: (Python)
Extrahierte den Wert aus dem Zeilenobjekt und wandte dann die case-Klasse an, um rdd in DF zu konvertieren
quelle
Bei neueren Versionen von spark (2.0+)
quelle
Angenommen, Val Spark ist ein Produkt eines SparkSession.builder ...
Gleiche Schritte, aber mit weniger Wertdeklarationen:
quelle
Ich habe versucht , die Lösung mit dem erklären Wortzählimpuls Problem . 1. Lesen Sie die Datei mit sc
Methoden zum Erstellen von DF
Datei mit Funken lesen
Rdd zu Dataframe
val df = sc.textFile ("D: // cca175 / data /") .toDF ("t1") df.show
Methode 1
Erstellen Sie eine Wortanzahl RDD für Dataframe
Methode2
Erstellen Sie einen Datenrahmen aus Rdd
Methode3
Schema definieren
import org.apache.spark.sql.types._
val schema = new StructType (). add (StructField ("word", StringType, true)). add (StructField ("count", StringType, true))
Erstellen Sie RowRDD
Erstellen Sie DataFrame aus RDD mit Schema
val df = spark.createDataFrame (rowRdd, schema)
df.show
quelle
Um ein Array [Row] in DataFrame oder Dataset zu konvertieren, funktioniert Folgendes elegant:
Angenommen, Schema ist dann der StructType für die Zeile
quelle