Sie können einer DataFrame
in Spark keine beliebige Spalte hinzufügen . Neue Spalten können nur mithilfe von Literalen erstellt werden (andere Literaltypen werden unter Hinzufügen einer konstanten Spalte in einem Spark-Datenrahmen beschrieben? ).
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
Transformieren einer vorhandenen Spalte:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
enthalten mit join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
oder generiert mit function / udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
In Bezug auf die Leistung werden integrierte Funktionen ( pyspark.sql.functions
), die dem Catalyst-Ausdruck zugeordnet sind, normalerweise benutzerdefinierten Python-Funktionen vorgezogen.
Wenn Sie den Inhalt einer beliebigen RDD als Spalte hinzufügen möchten, können Sie dies tun
So fügen Sie eine Spalte mithilfe einer UDF hinzu:
quelle
Für Spark 2.0
quelle
df = df.select('*', (df.age + 10).alias('agePlusTen'))
, fügen Sie effektiv eine beliebige Spalte hinzu, da @ zero323 uns oben gewarnt hat, dass dies unmöglich ist, es sei denn, es stimmt etwas nicht in Spark, in Pandas ist dies der Standardweg.df.select('*', df.age + 10, df.age + 20)
Es gibt mehrere Möglichkeiten, eine neue Spalte in pySpark hinzuzufügen.
Erstellen wir zunächst einen einfachen DataFrame.
Versuchen wir nun, den Spaltenwert zu verdoppeln und in einer neuen Spalte zu speichern. PFB wenige verschiedene Ansätze, um das gleiche zu erreichen.
Weitere Beispiele und Erklärungen zu den Funktionen von spark DataFrame finden Sie in meinem Blog .
Ich hoffe das hilft.
quelle
Sie können eine neue definieren,
udf
wenn Sie Folgendes hinzufügencolumn_name
:quelle
quelle
StringType()
.Ich möchte ein allgemeines Beispiel für einen sehr ähnlichen Anwendungsfall anbieten:
Anwendungsfall: Ich habe eine CSV bestehend aus:
Ich muss einige Transformationen durchführen und die endgültige CSV muss so aussehen
Ich muss dies tun, da dies das von einem Modell definierte Schema ist und meine endgültigen Daten mit SQL Bulk Inserts und dergleichen interoperabel sein müssen.
so:
1) Ich habe die Original-CSV mit spark.read gelesen und nenne sie "df".
2) Ich mache etwas mit den Daten.
3) Ich füge die Nullspalten mit diesem Skript hinzu:
Auf diese Weise können Sie Ihr Schema nach dem Laden einer CSV strukturieren (würde auch zum Neuordnen von Spalten funktionieren, wenn Sie dies für viele Tabellen tun müssen).
quelle
Der einfachste Weg, eine Spalte hinzuzufügen, ist die Verwendung von "withColumn". Da der Datenrahmen mit sqlContext erstellt wird, müssen Sie das Schema angeben oder es kann standardmäßig im Dataset verfügbar sein. Wenn das Schema angegeben wird, wird die Arbeitslast bei jeder Änderung mühsam.
Unten finden Sie ein Beispiel, das Sie berücksichtigen können:
quelle
Mit den folgenden Schritten können wir DataFrame direkt zusätzliche Spalten hinzufügen:
quelle