So konvertieren Sie ein rdd-Objekt in einen Datenrahmen in Spark

139

Wie kann ich eine RDD konvertieren ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]) zu einem Datenrahmen org.apache.spark.sql.DataFrame. Ich habe einen Datenrahmen mit rdd konvertiert .rdd. Nach der Verarbeitung möchte ich es wieder im Datenrahmen haben. Wie kann ich das machen ?

user568109
quelle
Weg, um dies in Spark 2.x
mrsrinivas

Antworten:

88

SqlContexthat eine Reihe von createDataFrameMethoden, die eine DataFramebestimmte an erstellen RDD. Ich kann mir vorstellen, dass eine davon für Ihren Kontext geeignet ist.

Beispielsweise:

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Erstellt einen DataFrame aus einer RDD mit Zeilen unter Verwendung des angegebenen Schemas.

Der archetypische Paulus
quelle
92

Dieser Code funktioniert perfekt ab Spark 2.x mit Scala 2.11

Importieren Sie die erforderlichen Klassen

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

SparkSessionObjekt erstellen und hier ist esspark

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs

Lass es uns RDDschaffenDataFrame

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

Methode 1

Verwenden von SparkSession.createDataFrame(RDD obj).

val dfWithoutSchema = spark.createDataFrame(rdd)

dfWithoutSchema.show()
+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
|  test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+

Methode 2

Die Verwendung SparkSession.createDataFrame(RDD obj)und Spaltennamen angeben.

val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals")

dfWithSchema.show()
+------+--------------------+
|    id|                vals|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
|  test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+

Methode 3 (Aktuelle Antwort auf die Frage)

Auf diese Weise muss die Eingabe rddvom Typ sein RDD[Row].

val rowsRdd: RDD[Row] = sc.parallelize(
  Seq(
    Row("first", 2.0, 7.0),
    Row("second", 3.5, 2.5),
    Row("third", 7.0, 5.9)
  )
)

Erstellen Sie das Schema

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("val1", DoubleType, true))
  .add(StructField("val2", DoubleType, true))

Wenden Sie nun beide rowsRddund schemaancreateDataFrame()

val df = spark.createDataFrame(rowsRdd, schema)

df.show()
+------+----+----+
|    id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
mrsrinivas
quelle
2
Vielen Dank, dass Sie die verschiedenen Verwendungsmöglichkeiten von createDataFrame auf verständliche Weise
aufgezeigt haben
Die dritte Methode ist hilfreich für Datenbausteine, da andere nicht funktionieren und einen Fehler
melden
67

Angenommen, Ihre RDD [Zeile] heißt rdd, können Sie Folgendes verwenden:

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()
dtjones
quelle
26
Ich denke, es funktioniert nicht für RDD [Row]. Vermisse ich etwas
Daniel de Paula
4
Da Spark 2.0 SQLContext durch SparkSession ersetzt wird, wird die Klasse aus Gründen der Abwärtskompatibilität (Scaladoc) in der Codebasis beibehalten. Wenn Sie es verwenden, wird eine Abwertungswarnung ausgegeben.
Tomaskazemekas
18

Hinweis: Diese Antwort wurde ursprünglich hier veröffentlicht

Ich poste diese Antwort, weil ich zusätzliche Details zu den verfügbaren Optionen mitteilen möchte, die ich in den anderen Antworten nicht gefunden habe


Um einen DataFrame aus einer RDD von Zeilen zu erstellen, gibt es zwei Hauptoptionen:

1) Wie bereits erwähnt, können Sie verwenden, toDF()welche von importiert werden können import sqlContext.implicits._. Dieser Ansatz funktioniert jedoch nur für die folgenden Arten von RDDs:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

(Quelle: Scaladoc des SQLContext.implicitsObjekts)

Die letzte Signatur bedeutet tatsächlich, dass sie für eine RDD von Tupeln oder eine RDD von Fallklassen funktionieren kann (da Tupel und Fallklassen Unterklassen von sind scala.Product).

Um diesen Ansatz für ein zu verwenden RDD[Row], müssen Sie ihn einem zuordnen RDD[T <: scala.Product]. Dies kann erreicht werden, indem jede Zeile einer benutzerdefinierten Fallklasse oder einem Tupel zugeordnet wird, wie in den folgenden Codeausschnitten:

val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

oder

case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

Der Hauptnachteil dieses Ansatzes (meiner Meinung nach) besteht darin, dass Sie das Schema des resultierenden DataFrame in der Kartenfunktion spaltenweise explizit festlegen müssen. Vielleicht kann dies programmgesteuert erfolgen, wenn Sie das Schema nicht im Voraus kennen, aber dort kann es etwas chaotisch werden. Alternativ gibt es also eine andere Option:


2) Sie können createDataFrame(rowRDD: RDD[Row], schema: StructType)wie in der akzeptierten Antwort verwenden, die im SQLContext- Objekt verfügbar ist . Beispiel für die Konvertierung einer RDD eines alten DataFrame:

val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

Beachten Sie, dass keine Schemaspalte explizit festgelegt werden muss. Wir verwenden das Schema des alten DF wieder, das von StructTypeKlasse ist und leicht erweitert werden kann. Dieser Ansatz ist jedoch manchmal nicht möglich und kann in einigen Fällen weniger effizient sein als der erste.

Daniel de Paula
quelle
Vielen Dank für das Detailimport sqlContext.implicits.
Javadba
Bitte veröffentlichen Sie in Zukunft keine identischen Antworten auf mehrere Fragen. Wenn es sich bei den Fragen um Duplikate handelt, geben Sie eine gute Antwort ein und stimmen Sie ab oder markieren Sie sie, um die andere Frage als Duplikat zu schließen. Wenn die Frage kein Duplikat ist, passen Sie Ihre Antworten auf die Frage an. Siehe Wie schreibe ich eine gute Antwort? .
15

Angenommen, Sie haben eine DataFrameund möchten einige Änderungen an den Felddaten vornehmen, indem Sie diese in konvertieren RDD[Row].

val aRdd = aDF.map(x=>Row(x.getAs[Long]("id"),x.getAs[List[String]]("role").head))

Um zurück DataFramevon zu konvertieren, müssen RDDwir den Strukturtyp des definieren RDD.

Wenn der Datentyp war Long , wird er wie LongTypein der Struktur.

Wenn Stringdann StringTypein Struktur.

val aStruct = new StructType(Array(StructField("id",LongType,nullable = true),StructField("role",StringType,nullable = true)))

Jetzt können Sie die RDD mithilfe der Methode createDataFrame in DataFrame konvertieren .

val aNamedDF = sqlContext.createDataFrame(aRdd,aStruct)
Ajay Gupta
quelle
7

Hier ist ein einfaches Beispiel für die Konvertierung Ihrer Liste in Spark RDD und die anschließende Konvertierung dieser Spark RDD in Dataframe.

Bitte beachten Sie, dass ich die scala REPL von Spark-Shell verwendet habe, um den folgenden Code auszuführen. Hier ist sc eine Instanz von SparkContext, die implizit in Spark-Shell verfügbar ist. Hoffe es beantwortet deine Frage.

scala> val numList = List(1,2,3,4,5)
numList: List[Int] = List(1, 2, 3, 4, 5)

scala> val numRDD = sc.parallelize(numList)
numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:28

scala> val numDF = numRDD.toDF
numDF: org.apache.spark.sql.DataFrame = [_1: int]

scala> numDF.show
+---+
| _1|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+
Rashmit Rathod
quelle
Eine lustige Tatsache: Dies funktioniert nicht mehr, wenn Ihre Liste Double anstelle von int (oder Long, String, <: Product) ist.
Rick Moritz
Antwortet nicht auf das OP: das über RDD [Row] spricht
javadba
6

Methode 1: (Scala)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z")

Methode 2: (Scala)

case class temp(val1: String,val3 : Double) 

val rdd = sc.parallelize(Seq(
  Row("foo",  0.5), Row("bar",  0.0)
))
val rows = rdd.map({case Row(val1:String,val3:Double) => temp(val1,val3)}).toDF()
rows.show()

Methode 1: (Python)

from pyspark.sql import Row
l = [('Alice',2)]
Person = Row('name','age')
rdd = sc.parallelize(l)
person = rdd.map(lambda r:Person(*r))
df2 = sqlContext.createDataFrame(person)
df2.show()

Methode 2: (Python)

from pyspark.sql.types import * 
l = [('Alice',2)]
rdd = sc.parallelize(l)
schema =  StructType([StructField ("name" , StringType(), True) , 
StructField("age" , IntegerType(), True)]) 
df3 = sqlContext.createDataFrame(rdd, schema) 
df3.show()

Extrahierte den Wert aus dem Zeilenobjekt und wandte dann die case-Klasse an, um rdd in DF zu konvertieren

val temp1 = attrib1.map{case Row ( key: Int ) => s"$key" }
val temp2 = attrib2.map{case Row ( key: Int) => s"$key" }

case class RLT (id: String, attrib_1 : String, attrib_2 : String)
import hiveContext.implicits._

val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF
Aravind Krishnakumar
quelle
4

Bei neueren Versionen von spark (2.0+)

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val spark = SparkSession
  .builder()
  .getOrCreate()
import spark.implicits._

val dfSchema = Seq("col1", "col2", "col3")
rdd.toDF(dfSchema: _*)
Ozzieisaacs
quelle
1
sparkSession ist nur ein Wrapper für sqlContext, hiveContext
Archit
1
One needs to create a schema, and attach it to the Rdd.

Angenommen, Val Spark ist ein Produkt eines SparkSession.builder ...

    import org.apache.spark._
    import org.apache.spark.sql._       
    import org.apache.spark.sql.types._

    /* Lets gin up some sample data:
     * As RDD's and dataframes can have columns of differing types, lets make our
     * sample data a three wide, two tall, rectangle of mixed types.
     * A column of Strings, a column of Longs, and a column of Doubules 
     */
    val arrayOfArrayOfAnys = Array.ofDim[Any](2,3)
    arrayOfArrayOfAnys(0)(0)="aString"
    arrayOfArrayOfAnys(0)(1)=0L
    arrayOfArrayOfAnys(0)(2)=3.14159
    arrayOfArrayOfAnys(1)(0)="bString"
    arrayOfArrayOfAnys(1)(1)=9876543210L
    arrayOfArrayOfAnys(1)(2)=2.71828

    /* The way to convert an anything which looks rectangular, 
     * (Array[Array[String]] or Array[Array[Any]] or Array[Row], ... ) into an RDD is to 
     * throw it into sparkContext.parallelize.
     * http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext shows
     * the parallelize definition as 
     *     def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)
     * so in our case our ArrayOfArrayOfAnys is treated as a sequence of ArraysOfAnys.
     * Will leave the numSlices as the defaultParallelism, as I have no particular cause to change it. 
     */
    val rddOfArrayOfArrayOfAnys=spark.sparkContext.parallelize(arrayOfArrayOfAnys)

    /* We'll be using the sqlContext.createDataFrame to add a schema our RDD.
     * The RDD which goes into createDataFrame is an RDD[Row] which is not what we happen to have.
     * To convert anything one tall and several wide into a Row, one can use Row.fromSeq(thatThing.toSeq)
     * As we have an RDD[somethingWeDontWant], we can map each of the RDD rows into the desired Row type. 
     */     
    val rddOfRows=rddOfArrayOfArrayOfAnys.map(f=>
        Row.fromSeq(f.toSeq)
    )

    /* Now to construct our schema. This needs to be a StructType of 1 StructField per column in our dataframe.
     * https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField shows the definition as
     *   case class StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty)
     * Will leave the two default values in place for each of the columns:
     *        nullability as true, 
     *        metadata as an empty Map[String,Any]
     *   
     */

    val schema = StructType(
        StructField("colOfStrings", StringType) ::
        StructField("colOfLongs"  , LongType  ) ::
        StructField("colOfDoubles", DoubleType) ::
        Nil
    )

    val df=spark.sqlContext.createDataFrame(rddOfRows,schema)
    /*
     *      +------------+----------+------------+
     *      |colOfStrings|colOfLongs|colOfDoubles|
     *      +------------+----------+------------+
     *      |     aString|         0|     3.14159|
     *      |     bString|9876543210|     2.71828|
     *      +------------+----------+------------+
    */ 
    df.show 

Gleiche Schritte, aber mit weniger Wertdeklarationen:

    val arrayOfArrayOfAnys=Array(
        Array("aString",0L         ,3.14159),
        Array("bString",9876543210L,2.71828)
    )

    val rddOfRows=spark.sparkContext.parallelize(arrayOfArrayOfAnys).map(f=>Row.fromSeq(f.toSeq))

    /* If one knows the datatypes, for instance from JDBC queries as to RDBC column metadata:
     * Consider constructing the schema from an Array[StructField].  This would allow looping over 
     * the columns, with a match statement applying the appropriate sql datatypes as the second
     *  StructField arguments.   
     */
    val sf=new Array[StructField](3)
    sf(0)=StructField("colOfStrings",StringType)
    sf(1)=StructField("colOfLongs"  ,LongType  )
    sf(2)=StructField("colOfDoubles",DoubleType)        
    val df=spark.sqlContext.createDataFrame(rddOfRows,StructType(sf.toList))
    df.show
teserecter
quelle
1

Ich habe versucht , die Lösung mit dem erklären Wortzählimpuls Problem . 1. Lesen Sie die Datei mit sc

  1. Wortanzahl erzeugen
  2. Methoden zum Erstellen von DF

    • rdd.toDF-Methode
    • rdd.toDF ("Wort", "Anzahl")
      • spark.createDataFrame (rdd, schema)

    Datei mit Funken lesen

    val rdd=sc.textFile("D://cca175/data/")  

    Rdd zu Dataframe

    val df = sc.textFile ("D: // cca175 / data /") .toDF ("t1") df.show

    Methode 1

    Erstellen Sie eine Wortanzahl RDD für Dataframe

    val df=rdd.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>(x+y)).toDF("word","count")

    Methode2

    Erstellen Sie einen Datenrahmen aus Rdd

    val df=spark.createDataFrame(wordRdd) 
    # with header   
    val df=spark.createDataFrame(wordRdd).toDF("word","count")  df.show

    Methode3

    Schema definieren

    import org.apache.spark.sql.types._

    val schema = new StructType (). add (StructField ("word", StringType, true)). add (StructField ("count", StringType, true))

    Erstellen Sie RowRDD

    import org.apache.spark.sql.Row
    val rowRdd=wordRdd.map(x=>(Row(x._1,x._2)))     

    Erstellen Sie DataFrame aus RDD mit Schema

    val df = spark.createDataFrame (rowRdd, schema)
    df.show

Priyanshu Singh
quelle
0

Um ein Array [Row] in DataFrame oder Dataset zu konvertieren, funktioniert Folgendes elegant:

Angenommen, Schema ist dann der StructType für die Zeile

val rows: Array[Row]=...
implicit val encoder = RowEncoder.apply(schema)
import spark.implicits._
rows.toDS
Tom
quelle