Aktualisieren einer Datenrahmenspalte in Spark

72

Bei Betrachtung der neuen Spark-Datenrahmen-API ist unklar, ob es möglich ist, Datenrahmenspalten zu ändern.

Wie würde ich mich über einen Wert in der Zeile zu ändern xSpalte yeines Datenrahmens?

In pandasdiesem wäredf.ix[x,y] = new_value

Bearbeiten: Wenn Sie das unten Gesagte konsolidieren, können Sie den vorhandenen Datenrahmen nicht ändern, da er unveränderlich ist. Sie können jedoch einen neuen Datenrahmen mit den gewünschten Änderungen zurückgeben.

Wenn Sie nur einen Wert in einer Spalte ersetzen möchten, der auf einer Bedingung basiert, wie z np.where.

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

Wenn Sie eine Operation für eine Spalte ausführen und eine neue Spalte erstellen möchten, die dem Datenrahmen hinzugefügt wird:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

Wenn die neue Spalte denselben Namen wie die alte Spalte haben soll, können Sie den zusätzlichen Schritt hinzufügen:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
Luke
quelle
Wenn Sie über den Index auf den DataFrame zugreifen möchten, müssen Sie zuerst einen Index erstellen. Siehe z . B. stackoverflow.com/questions/26828815/… . Oder fügen Sie eine Indexspalte mit Ihrem eigenen Index hinzu.
Fanfabbb

Antworten:

70

Während Sie eine Spalte als solche nicht ändern können, können Sie eine Spalte bearbeiten und einen neuen DataFrame zurückgeben, der diese Änderung widerspiegelt. Dazu erstellen Sie zuerst eine UserDefinedFunctionImplementierung der anzuwendenden Operation und wenden diese Funktion dann selektiv nur auf die Zielspalte an. In Python:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_dfhat jetzt das gleiche Schema wie old_df(vorausgesetzt, es old_df.target_columnwar auch vom Typ StringType), aber alle Werte in der Spalte target_columnsind new_value.

Karlson
quelle
1
Dies ist eine tatsächliche Antwort auf das Problem, danke! Doch die Funkenjobs sind für mich noch nicht beendet, alle Testamentsvollstrecker verlieren sich. Können Sie sich einen alternativen Weg vorstellen? Ich benutze es mit einer etwas komplexeren UDF, bei der ich Transformationen in Strings durchführe. Es gibt keine pandasähnliche Syntax wie new_df = old_df.col1.apply (Lambda x: func (x))?
Fanfabbb
24
Es gibt auch:new_df = old_df.withColumn('target_column', udf(df.name))
Fanfabbb
2
Ja, das sollte gut funktionieren. Beachten Sie, dass UDFs nur Spalten als Parameter verwenden können. Wenn Sie andere Daten an die Funktion übergeben möchten, müssen Sie diese zuerst teilweise anwenden.
Karlson
1
@KatyaHandler Wenn Sie eine Spalte nur duplizieren möchten, können Sie sie einfach zweimal auswählen: df.select([df[col], df[col].alias('same_column')])Wo colist der Name der Spalte, die Sie duplizieren möchten? Mit der neuesten Spark-Version können viele Dinge, für die ich UDFs verwendet habe, mit den in definierten Funktionen ausgeführt werden pyspark.sql.functions. Die UDF-Leistung in Pyspark ist wirklich schlecht, daher lohnt es sich möglicherweise, einen Blick darauf zu werfen
karlson
1
es ist StringType nicht Stringtypeinudf = UserDefinedFunction(lambda x: 'new_value', Stringtype())
Namit Juneja
47

In der Regel möchten wir beim Aktualisieren einer Spalte einen alten Wert einem neuen Wert zuordnen. Hier ist eine Möglichkeit, dies im Pyspark ohne UDFs zu tun:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).
Paul
quelle
Wie verwende ich das, wenn mein update_col eine Liste ist update_cols=['col1','col2','col3']? Ex- = : ?
GeekFactory
Verwenden Sie eine for-Schleife.
Paul
13

DataFramesbasieren auf RDDs. RDDs sind unveränderliche Strukturen und erlauben keine Aktualisierung von Elementen vor Ort. Um Werte zu ändern, müssen Sie einen neuen DataFrame erstellen, indem Sie den ursprünglichen entweder mit SQL-ähnlichen DSL- oder RDD-Operationen wie transformieren map.

Ein sehr empfehlenswertes Dia-Deck: Einführung von DataFrames in Spark for Large Scale Data Science .

maasg
quelle
3
Was genau fügt die Datenrahmenabstraktion hinzu, die nicht bereits in der gleichen Anzahl von Zeilen mit einer Tabelle ausgeführt werden kann?
Luke
"DataFrames führt neue vereinfachte Operatoren zum Filtern, Aggregieren und Projizieren großer Datenmengen ein. Intern nutzen DataFrames den logischen Spark SQL-Optimierer, um die physische Ausführung von Vorgängen intelligent zu planen und für große Datenmengen
Ansage
11

Genau wie maasg sagt, können Sie einen neuen DataFrame aus dem Ergebnis einer Map erstellen, die auf den alten DataFrame angewendet wird. Ein Beispiel für einen bestimmten DataFrame dfmit zwei Zeilen:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Beachten Sie, dass Sie, wenn sich die Spaltentypen ändern, stattdessen ein korrektes Schema angeben müssen df.schema. Überprüfen Sie die API von org.apache.spark.sql.Rowfür verfügbare Methoden: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Update] Oder UDFs in Scala verwenden:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

und wenn der Spaltenname gleich bleiben muss, können Sie ihn wieder umbenennen:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
radek1st
quelle
2

Importieren von col aus pyspark.sql.functions und Aktualisieren der fünften Spalte auf eine Ganzzahl (0,1,2) basierend auf der Zeichenfolge (Zeichenfolge a, Zeichenfolge b, Zeichenfolge c) in einen neuen DataFrame.

from pyspark.sql.functions import col, when 

data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))
DHEERAJ
quelle