Extrahieren Sie die Spaltenwerte von Dataframe als Liste in Apache Spark

86

Ich möchte eine Zeichenfolgenspalte eines Datenrahmens in eine Liste konvertieren. Was ich in der DataframeAPI finden kann, ist RDD. Daher habe ich versucht, es zuerst wieder in RDD zu konvertieren und dann die toArrayFunktion auf das RDD anzuwenden . In diesem Fall funktionieren Länge und SQL einwandfrei. Das Ergebnis von RDD hat jedoch eckige Klammern um jedes Element wie dieses [A00001]. Ich habe mich gefragt, ob es eine geeignete Möglichkeit gibt, eine Spalte in eine Liste umzuwandeln oder die eckigen Klammern zu entfernen.

Anregungen wäre dankbar. Vielen Dank!

SH Y.
quelle

Antworten:

116

Dies sollte die Sammlung zurückgeben, die eine einzelne Liste enthält:

dataFrame.select("YOUR_COLUMN_NAME").rdd.map(r => r(0)).collect()

Ohne die Zuordnung erhalten Sie nur ein Zeilenobjekt, das jede Spalte aus der Datenbank enthält.

Denken Sie daran, dass Sie dadurch wahrscheinlich eine Liste mit beliebigen Typen erhalten. ÏWenn Sie den Ergebnistyp angeben möchten, können Sie .asInstanceOf [YOUR_TYPE] für die r => r(0).asInstanceOf[YOUR_TYPE]Zuordnung verwenden

PS Aufgrund der automatischen Konvertierung können Sie das .rddTeil überspringen .

Niemand
quelle
3
Aus irgendeinem seltsamen Grund funktioniert es umgekehrt (Spark 2.1.0) collect().map(r => r(0))- hat diese Reihenfolge irgendwelche Nachteile?
Boern
Kann langsamer sein - Ihre Lösung sammelt zuerst alle Daten auf dem Treiber und anschließend die Zuordnung auf dem Treiber (ohne Hilfe des Testaments), wobei nur die Verarbeitungsleistung eines einzelnen Treibers verwendet wird.
Niemand
72

Mit Spark 2.x und Scala 2.11

Ich würde mir 3 Möglichkeiten vorstellen, um Werte einer bestimmten Spalte in List umzuwandeln.

Gemeinsame Codefragmente für alle Ansätze

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.getOrCreate    
import spark.implicits._ // for .toDF() method

val df = Seq(
    ("first", 2.0),
    ("test", 1.5), 
    ("choose", 8.0)
  ).toDF("id", "val")

Ansatz 1

df.select("id").collect().map(_(0)).toList
// res9: List[Any] = List(one, two, three)

Was passiert jetzt? Wir sammeln Daten für den Treiber mit collect()und wählen aus jedem Datensatz das Element Null aus.

Dies könnte keine hervorragende Möglichkeit sein. Verbessern wir es mit dem nächsten Ansatz.


Ansatz 2

df.select("id").rdd.map(r => r(0)).collect.toList 
//res10: List[Any] = List(one, two, three)

Wie ist es besser Wir haben die Last der Kartentransformation auf die Worker verteilt und nicht auf einen einzelnen Treiber.

Ich weiß, rdd.map(r => r(0))scheint dir nicht elegant zu sein. Lassen Sie uns dies im nächsten Ansatz ansprechen.


Ansatz 3

df.select("id").map(r => r.getString(0)).collect.toList 
//res11: List[String] = List(one, two, three)

Hier konvertieren wir DataFrame nicht in RDD. Schauen Sie sich an, dass mapdies aufgrund von Encoderproblemen in DataFrame nicht als vorheriger Ansatz akzeptiert r => r(0)(oder _(0)) wird. Verwenden r => r.getString(0)Sie also am Ende und es wird in den nächsten Versionen von Spark behandelt.

Fazit

Alle Optionen liefern die gleiche Ausgabe, aber 2 und 3 sind effektiv, schließlich ist die dritte effektiv und elegant (würde ich denken).

Databricks Notizbuch

mrsrinivas
quelle
24

Ich weiß, dass die gegebene und angeforderte Antwort für Scala angenommen wird, daher stelle ich nur einen kleinen Ausschnitt aus Python-Code zur Verfügung, falls ein PySpark-Benutzer neugierig ist. Die Syntax ähnelt der angegebenen Antwort, aber um die Liste richtig zu löschen, muss ich in der Zuordnungsfunktion ein zweites Mal auf den Spaltennamen verweisen, und ich benötige die select-Anweisung nicht.

dh ein DataFrame, der eine Spalte mit dem Namen "Raw" enthält

Um jeden Zeilenwert in "Raw" als Liste zu kombinieren, wobei jeder Eintrag ein Zeilenwert aus "Raw" ist, verwende ich einfach:

MyDataFrame.rdd.map(lambda x: x.Raw).collect()
Abby Sobh
quelle
4
Dies gibt eine Liste von Zeilenobjekten. Was ist, wenn Sie eine Liste der Werte wünschen?
ThatDataGuy
Dies gibt eine Liste von Werten.
Abby Sobh
Danke, dass du das geteilt hast! Das funktioniert bei mir großartig. Ich frage mich nur, ob es einen Weg gibt, dies zu beschleunigen. Es läuft ziemlich langsam
Mojgan Mazouchi,
5

Versuchen Sie dies in Scala und Spark 2+ (vorausgesetzt, Ihr Spaltenname lautet "s"): df.select('s).as[String].collect

kanielc
quelle
3
sqlContext.sql(" select filename from tempTable").rdd.map(r => r(0)).collect.toList.foreach(out_streamfn.println) //remove brackets

es funktioniert perfekt

Shaina Raza
quelle
1
List<String> whatever_list = df.toJavaRDD().map(new Function<Row, String>() {
    public String call(Row row) {
        return row.getAs("column_name").toString();
    }
}).collect();

logger.info(String.format("list is %s",whatever_list)); //verification

Da niemand eine Lösung in Java (Real Programming Language) gegeben hat, kann ich mich später bedanken

user12910640
quelle
0
from pyspark.sql.functions import col

df.select(col("column_name")).collect()

Hier sammeln ist Funktionen, die es wiederum in Liste konvertieren. Verwenden Sie die Liste für den riesigen Datensatz. Dies verringert die Leistung. Es ist gut, die Daten zu überprüfen.

Amarnath Pickel
quelle
0

Dies ist Java-Antwort.

df.select("id").collectAsList();
Vahbuna
quelle
0

Eine aktualisierte Lösung, mit der Sie eine Liste erhalten:

dataFrame.select("YOUR_COLUMN_NAME").map(r => r.getString(0)).collect.toList
Athanasios Tsiaras
quelle