Umbenennen von Spaltennamen eines DataFrame in Spark Scala

93

Ich versuche alle Header / Spaltennamen von a DataFramein Spark-Scala zu konvertieren . Ab sofort habe ich folgenden Code, der nur einen einzelnen Spaltennamen ersetzt.

for( i <- 0 to origCols.length - 1) {
  df.withColumnRenamed(
    df.columns(i), 
    df.columns(i).toLowerCase
  );
}
Sam
quelle

Antworten:

237

Wenn die Struktur flach ist:

val df = Seq((1L, "a", "foo", 3.0)).toDF
df.printSchema
// root
//  |-- _1: long (nullable = false)
//  |-- _2: string (nullable = true)
//  |-- _3: string (nullable = true)
//  |-- _4: double (nullable = false)

Das Einfachste, was Sie tun können, ist die Verwendung der folgenden toDFMethode:

val newNames = Seq("id", "x1", "x2", "x3")
val dfRenamed = df.toDF(newNames: _*)

dfRenamed.printSchema
// root
// |-- id: long (nullable = false)
// |-- x1: string (nullable = true)
// |-- x2: string (nullable = true)
// |-- x3: double (nullable = false)

Wenn Sie einzelne Spalten umbenennen möchten , können Sie entweder selectmit alias:

df.select($"_1".alias("x1"))

die leicht auf mehrere Spalten verallgemeinert werden kann:

val lookup = Map("_1" -> "foo", "_3" -> "bar")

df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)

oder withColumnRenamed:

df.withColumnRenamed("_1", "x1")

die mit verwenden foldLeft, um mehrere Spalten umzubenennen:

lookup.foldLeft(df)((acc, ca) => acc.withColumnRenamed(ca._1, ca._2))

Bei verschachtelten Strukturen ( structs) besteht eine mögliche Option darin, eine ganze Struktur auszuwählen:

val nested = spark.read.json(sc.parallelize(Seq(
    """{"foobar": {"foo": {"bar": {"first": 1.0, "second": 2.0}}}, "id": 1}"""
)))

nested.printSchema
// root
//  |-- foobar: struct (nullable = true)
//  |    |-- foo: struct (nullable = true)
//  |    |    |-- bar: struct (nullable = true)
//  |    |    |    |-- first: double (nullable = true)
//  |    |    |    |-- second: double (nullable = true)
//  |-- id: long (nullable = true)

@transient val foobarRenamed = struct(
  struct(
    struct(
      $"foobar.foo.bar.first".as("x"), $"foobar.foo.bar.first".as("y")
    ).alias("point")
  ).alias("location")
).alias("record")

nested.select(foobarRenamed, $"id").printSchema
// root
//  |-- record: struct (nullable = false)
//  |    |-- location: struct (nullable = false)
//  |    |    |-- point: struct (nullable = false)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)
//  |-- id: long (nullable = true)

Beachten Sie, dass dies Auswirkungen auf nullabilityMetadaten haben kann. Eine andere Möglichkeit ist das Umbenennen durch Casting:

nested.select($"foobar".cast(
  "struct<location:struct<point:struct<x:double,y:double>>>"
).alias("record")).printSchema

// root
//  |-- record: struct (nullable = true)
//  |    |-- location: struct (nullable = true)
//  |    |    |-- point: struct (nullable = true)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)

oder:

import org.apache.spark.sql.types._

nested.select($"foobar".cast(
  StructType(Seq(
    StructField("location", StructType(Seq(
      StructField("point", StructType(Seq(
        StructField("x", DoubleType), StructField("y", DoubleType)))))))))
).alias("record")).printSchema

// root
//  |-- record: struct (nullable = true)
//  |    |-- location: struct (nullable = true)
//  |    |    |-- point: struct (nullable = true)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)
null323
quelle
Hi @ zero323 Bei Verwendung von withColumnRenamed erhalte ich AnalysisException kann CC8 nicht auflösen. 1 'gegebene Eingabespalten ... Es schlägt fehl, obwohl CC8.1 in DataFrame verfügbar ist.
Umesh K
@ u449355 Es ist mir nicht klar, ob es sich um eine verschachtelte Spalte oder eine Spalte mit Punkten handelt. Im späteren Fall sollten Backticks funktionieren (zumindest in einigen grundlegenden Fällen).
Null323
1
was : _*)bedeutet indf.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)
Anton Kim
1
Um Anton Kims Frage zu beantworten: Das : _*ist der sogenannte "Splat" -Operator der Scala. Grundsätzlich wird ein Array-ähnliches Objekt in eine nicht enthaltene Liste aufgelöst. Dies ist nützlich, wenn Sie das Array an eine Funktion übergeben möchten, die eine beliebige Anzahl von Argumenten akzeptiert, jedoch keine Version hat, die a akzeptiert List[]. Wenn Sie mit Perl überhaupt vertraut sind, ist es der Unterschied zwischen some_function(@my_array) # "splatted"und some_function(\@my_array) # not splatted ... in perl the backslash "\" operator returns a reference to a thing.
Mylo Stone
1
Diese Aussage ist für mich wirklich dunkel df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*). Könnten Sie sie bitte zerlegen? vor allem das lookup.getOrElse(c,c)Teil.
Aetos
18

Für diejenigen unter Ihnen, die an der PySpark-Version interessiert sind (tatsächlich ist es in Scala dasselbe - siehe Kommentar unten):

    merchants_df_renamed = merchants_df.toDF(
        'merchant_id', 'category', 'subcategory', 'merchant')

    merchants_df_renamed.printSchema()

Ergebnis:

root
| - Merchant_id: Ganzzahl (nullable = true)
| - Kategorie: Zeichenfolge (nullable = true)
| - Unterkategorie: Zeichenfolge (nullable = true)
| - Händler: Zeichenfolge (nullable = true)

Tagar
quelle
1
Bei der Verwendung toDF()zum Umbenennen von Spalten in DataFrame muss vorsichtig vorgegangen werden. Diese Methode arbeitet viel langsamer als andere. Ich habe DataFrame enthält 100M Datensätze und einfache Zählabfrage dauert ~ 3s, während die gleiche Abfrage mit toDF()Methode ~ 16s dauert. Aber wenn select col AS col_newich die Methode zum Umbenennen verwende, bekomme ich wieder ~ 3s. Mehr als 5 mal schneller! Spark 2.3.2.3
Ihor
6
def aliasAllColumns(t: DataFrame, p: String = "", s: String = ""): DataFrame =
{
  t.select( t.columns.map { c => t.col(c).as( p + c + s) } : _* )
}

Falls dies nicht offensichtlich ist, werden jedem der aktuellen Spaltennamen ein Präfix und ein Suffix hinzugefügt. Dies kann nützlich sein, wenn Sie zwei Tabellen mit einer oder mehreren Spalten mit demselben Namen haben und diese verbinden möchten, die Spalten in der resultierenden Tabelle jedoch weiterhin eindeutig unterscheiden können. Es wäre sicher schön, wenn es eine ähnliche Möglichkeit gäbe, dies in "normalem" SQL zu tun.

Mylo Stone
quelle
mag es sicher, schön und elegant
thebluephantom
1

Angenommen, der Datenrahmen df hat 3 Spalten id1, name1, price1 und Sie möchten sie in id2, name2, price2 umbenennen

val list = List("id2", "name2", "price2")
import spark.implicits._
val df2 = df.toDF(list:_*)
df2.columns.foreach(println)

Ich fand diesen Ansatz in vielen Fällen nützlich.

Jagadeesh Verri
quelle
0

Tow Table Join benennt den verbundenen Schlüssel nicht um

// method 1: create a new DF
day1 = day1.toDF(day1.columns.map(x => if (x.equals(key)) x else s"${x}_d1"): _*)

// method 2: use withColumnRenamed
for ((x, y) <- day1.columns.filter(!_.equals(key)).map(x => (x, s"${x}_d1"))) {
    day1 = day1.withColumnRenamed(x, y)
}

funktioniert!

Colin Wang
quelle