Verketten Sie Spalten in Apache Spark DataFrame

115

Wie verketten wir zwei Spalten in einem Apache Spark DataFrame? Gibt es eine Funktion in Spark SQL, die wir verwenden können?

Nipun
quelle

Antworten:

175

Mit Raw SQL können Sie Folgendes verwenden CONCAT:

  • In Python

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
  • In der Scala

    import sqlContext.implicits._
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")

Seit Spark 1.5.0 können Sie concatFunktionen mit der DataFrame-API verwenden:

  • In Python:

    from pyspark.sql.functions import concat, col, lit
    
    df.select(concat(col("k"), lit(" "), col("v")))
  • In Scala:

    import org.apache.spark.sql.functions.{concat, lit}
    
    df.select(concat($"k", lit(" "), $"v"))

Es gibt auch eine concat_wsFunktion, die ein String-Trennzeichen als erstes Argument verwendet.

null323
quelle
45

Hier erfahren Sie, wie Sie benutzerdefinierte Namen erstellen können

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()

gibt,

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+

Erstellen Sie eine neue Spalte, indem Sie Folgendes verketten:

df = df.withColumn('joined_column', 
                    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()

+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+
Myon
quelle
4
literstellt eine Spalte von_
Myon
34

Eine Option zum Verketten von Zeichenfolgenspalten in Spark Scala ist die Verwendung concat.

Es ist notwendig, nach Nullwerten zu suchen . Wenn eine der Spalten null ist, ist das Ergebnis auch dann null, wenn eine der anderen Spalten Informationen enthält.

Verwenden von concatund withColumn:

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

Verwenden von concatund select:

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")

Bei beiden Ansätzen erhalten Sie eine NEW_COLUMN, deren Wert eine Verkettung der Spalten COL1 und COL2 aus Ihrem ursprünglichen df ist.

Ignacio Alorre
quelle
1
Ich habe Ihre Methode in pyspark ausprobiert, aber sie hat nicht funktioniert. Die Warnung "col sollte Spalte sein".
Samson
@ Samson Entschuldigung, ich habe nur
Uhr
3
@IgnacioAlorre Wenn Sie concat_wsanstelle von verwenden concat, können Sie vermeiden, nach NULL zu suchen.
Aswath K
18

Wenn Sie dies mit DF tun möchten, können Sie ein udf verwenden, um eine neue Spalte basierend auf vorhandenen Spalten hinzuzufügen.

val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)

//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))

//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )

//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
Dänische Shrestha
quelle
12

Ab Spark 2.3 ( SPARK-22771 ) unterstützt Spark SQL den Verkettungsoperator|| .

Beispielsweise;

val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")
Krishas
quelle
10

Hier ist eine andere Möglichkeit, dies für pyspark zu tun:

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit

#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])

#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))

#Show the new data frame
personDF.show()

----------RESULT-------------------------

84
+------------+
|East African|
+------------+
|   Ethiopian|
|      Kenyan|
|     Ugandan|
|     Rwandan|
+------------+
Teddy Belay
quelle
7

Hier ist ein Vorschlag, wenn Sie die Anzahl oder den Namen der Spalten im Datenrahmen nicht kennen.

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
wones0120
quelle
3

concat (* cols)

v1.5 und höher

Verkettet mehrere Eingabespalten zu einer einzigen Spalte. Die Funktion arbeitet mit Zeichenfolgen, binären und kompatiblen Array-Spalten.

Z.B: new_df = df.select(concat(df.a, df.b, df.c))


concat_ws (sep, * cols)

v1.5 und höher

Ähnlich wie concat, verwendet jedoch das angegebene Trennzeichen.

Z.B: new_df = df.select(concat_ws('-', df.col1, df.col2))


map_concat (* cols)

v2.4 und höher

Wird zum Konzentrieren von Karten verwendet und gibt die Vereinigung aller angegebenen Karten zurück.

Z.B: new_df = df.select(map_concat("map1", "map2"))


Verwenden des String Concat Operators ( ||):

v2.3 und höher

Z.B: df = spark.sql("select col_a || col_b || col_c as abc from table_x")

Referenz: Spark SQL-Dokument

Ani Menon
quelle
2

In Spark 2.3.0 können Sie Folgendes tun:

spark.sql( """ select '1' || column_a from table_a """)
Charlie 木匠
quelle
1

In Java können Sie dies tun, um mehrere Spalten zu verketten. Der Beispielcode soll Ihnen ein Szenario und dessen Verwendung zum besseren Verständnis liefern.

SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
                        .withColumn("concatenatedCol",
                                concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));


class JavaSparkSessionSingleton {
    private static transient SparkSession instance = null;

    public static SparkSession getInstance(SparkConf sparkConf) {
        if (instance == null) {
            instance = SparkSession.builder().config(sparkConf)
                    .getOrCreate();
        }
        return instance;
    }
}

Der obige Code verkettet col1, col2, col3 durch "_", um eine Spalte mit dem Namen "concatenatedCol" zu erstellen.

Wandermonk
quelle
1

Haben wir eine Java-Syntax, die dem folgenden Prozess entspricht?

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
Roopesh MB
quelle
0

Eine andere Möglichkeit, dies in pySpark mit sqlContext zu tun ...

#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])

# Now we can concatenate columns and assign the new column a name 
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))
Gur
quelle
0

In der Tat gibt es einige schöne eingebaute Abstraktionen, mit denen Sie Ihre Verkettung durchführen können, ohne eine benutzerdefinierte Funktion implementieren zu müssen. Da Sie Spark SQL erwähnt haben, versuchen Sie vermutlich, es als deklarativen Befehl über spark.sql () zu übergeben. In diesem Fall können Sie den SQL-Befehl wie folgt auf einfache Weise übergeben: SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;

Ab Spark 2.3.0 können Sie außerdem Befehle in folgenden Zeilen verwenden: SELECT col1 || col2 AS concat_column_name FROM <table_name>;

Dabei handelt es sich um Ihr bevorzugtes Trennzeichen (kann auch ein leerer Bereich sein) und um die temporäre oder permanente Tabelle, aus der Sie lesen möchten.


quelle
0

Wir können auch einfach SelectExpr verwenden. df1.selectExpr ("*", "obere (_2 || _3) wie neu")

Deepak Saxena
quelle