Bei Betrachtung der neuen Spark-Datenrahmen-API ist unklar, ob es möglich ist, Datenrahmenspalten zu ändern.
Wie würde ich mich über einen Wert in der Zeile zu ändern x
Spalte y
eines Datenrahmens?
In pandas
diesem wäredf.ix[x,y] = new_value
Bearbeiten: Wenn Sie das unten Gesagte konsolidieren, können Sie den vorhandenen Datenrahmen nicht ändern, da er unveränderlich ist. Sie können jedoch einen neuen Datenrahmen mit den gewünschten Änderungen zurückgeben.
Wenn Sie nur einen Wert in einer Spalte ersetzen möchten, der auf einer Bedingung basiert, wie z np.where
.
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
Wenn Sie eine Operation für eine Spalte ausführen und eine neue Spalte erstellen möchten, die dem Datenrahmen hinzugefügt wird:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))
Wenn die neue Spalte denselben Namen wie die alte Spalte haben soll, können Sie den zusätzlichen Schritt hinzufügen:
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
Antworten:
Während Sie eine Spalte als solche nicht ändern können, können Sie eine Spalte bearbeiten und einen neuen DataFrame zurückgeben, der diese Änderung widerspiegelt. Dazu erstellen Sie zuerst eine
UserDefinedFunction
Implementierung der anzuwendenden Operation und wenden diese Funktion dann selektiv nur auf die Zielspalte an. In Python:from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import StringType name = 'target_column' udf = UserDefinedFunction(lambda x: 'new_value', StringType()) new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
hat jetzt das gleiche Schema wieold_df
(vorausgesetzt, esold_df.target_column
war auch vom TypStringType
), aber alle Werte in der Spaltetarget_column
sindnew_value
.quelle
new_df = old_df.withColumn('target_column', udf(df.name))
df.select([df[col], df[col].alias('same_column')])
Wocol
ist der Name der Spalte, die Sie duplizieren möchten? Mit der neuesten Spark-Version können viele Dinge, für die ich UDFs verwendet habe, mit den in definierten Funktionen ausgeführt werdenpyspark.sql.functions
. Die UDF-Leistung in Pyspark ist wirklich schlecht, daher lohnt es sich möglicherweise, einen Blick darauf zu werfenStringType
nichtStringtype
inudf = UserDefinedFunction(lambda x: 'new_value', Stringtype())
In der Regel möchten wir beim Aktualisieren einer Spalte einen alten Wert einem neuen Wert zuordnen. Hier ist eine Möglichkeit, dies im Pyspark ohne UDFs zu tun:
# update df[update_col], mapping old_value --> new_value from pyspark.sql import functions as F df = df.withColumn(update_col, F.when(df[update_col]==old_value,new_value). otherwise(df[update_col])).
quelle
update_cols=['col1','col2','col3']
? Ex- = : ?DataFrames
basieren auf RDDs. RDDs sind unveränderliche Strukturen und erlauben keine Aktualisierung von Elementen vor Ort. Um Werte zu ändern, müssen Sie einen neuen DataFrame erstellen, indem Sie den ursprünglichen entweder mit SQL-ähnlichen DSL- oder RDD-Operationen wie transformierenmap
.Ein sehr empfehlenswertes Dia-Deck: Einführung von DataFrames in Spark for Large Scale Data Science .
quelle
Genau wie maasg sagt, können Sie einen neuen DataFrame aus dem Ergebnis einer Map erstellen, die auf den alten DataFrame angewendet wird. Ein Beispiel für einen bestimmten DataFrame
df
mit zwei Zeilen:val newDf = sqlContext.createDataFrame(df.map(row => Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
Beachten Sie, dass Sie, wenn sich die Spaltentypen ändern, stattdessen ein korrektes Schema angeben müssen
df.schema
. Überprüfen Sie die API vonorg.apache.spark.sql.Row
für verfügbare Methoden: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html[Update] Oder UDFs in Scala verwenden:
import org.apache.spark.sql.functions._ val toLong = udf[Long, String] (_.toLong) val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
und wenn der Spaltenname gleich bleiben muss, können Sie ihn wieder umbenennen:
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
quelle
Importieren von col aus pyspark.sql.functions und Aktualisieren der fünften Spalte auf eine Ganzzahl (0,1,2) basierend auf der Zeichenfolge (Zeichenfolge a, Zeichenfolge b, Zeichenfolge c) in einen neuen DataFrame.
from pyspark.sql.functions import col, when data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))
quelle