Spark SQL: Wenden Sie Aggregatfunktionen auf eine Liste von Spalten an

76

Gibt es eine Möglichkeit, eine Aggregatfunktion auf alle (oder eine Liste von) Spalten eines Datenrahmens anzuwenden, wenn Sie a ausführen groupBy? Mit anderen Worten, gibt es eine Möglichkeit, dies für jede Spalte zu vermeiden:

df.groupBy("col1")
  .agg(sum("col2").alias("col2"), sum("col3").alias("col3"), ...)
Lilloraffa
quelle

Antworten:

133

Es gibt mehrere Möglichkeiten, Aggregatfunktionen auf mehrere Spalten anzuwenden.

GroupedDataKlasse bietet eine Anzahl von Methoden für die am häufigsten verwendeten Funktionen, einschließlich der count, max, min, meanund sum, die direkt folgt , wie verwendet werden können:

  • Python:

    df = sqlContext.createDataFrame(
        [(1.0, 0.3, 1.0), (1.0, 0.5, 0.0), (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2)],
        ("col1", "col2", "col3"))
    
    df.groupBy("col1").sum()
    
    ## +----+---------+-----------------+---------+
    ## |col1|sum(col1)|        sum(col2)|sum(col3)|
    ## +----+---------+-----------------+---------+
    ## | 1.0|      2.0|              0.8|      1.0|
    ## |-1.0|     -2.0|6.199999999999999|      0.7|
    ## +----+---------+-----------------+---------+
    
  • Scala

    val df = sc.parallelize(Seq(
      (1.0, 0.3, 1.0), (1.0, 0.5, 0.0),
      (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2))
    ).toDF("col1", "col2", "col3")
    
    df.groupBy($"col1").min().show
    
    // +----+---------+---------+---------+
    // |col1|min(col1)|min(col2)|min(col3)|
    // +----+---------+---------+---------+
    // | 1.0|      1.0|      0.3|      0.0|
    // |-1.0|     -1.0|      0.6|      0.2|
    // +----+---------+---------+---------+
    

Optional können Sie eine Liste von Spalten übergeben, die aggregiert werden sollen

df.groupBy("col1").sum("col2", "col3")

Sie können auch ein Wörterbuch / eine Karte mit Spalten und den Schlüsseln übergeben und fungieren als Werte:

  • Python

    exprs = {x: "sum" for x in df.columns}
    df.groupBy("col1").agg(exprs).show()
    
    ## +----+---------+
    ## |col1|avg(col3)|
    ## +----+---------+
    ## | 1.0|      0.5|
    ## |-1.0|     0.35|
    ## +----+---------+
    
  • Scala

    val exprs = df.columns.map((_ -> "mean")).toMap
    df.groupBy($"col1").agg(exprs).show()
    
    // +----+---------+------------------+---------+
    // |col1|avg(col1)|         avg(col2)|avg(col3)|
    // +----+---------+------------------+---------+
    // | 1.0|      1.0|               0.4|      0.5|
    // |-1.0|     -1.0|3.0999999999999996|     0.35|
    // +----+---------+------------------+---------+
    

Schließlich können Sie varargs verwenden:

  • Python

    from pyspark.sql.functions import min
    
    exprs = [min(x) for x in df.columns]
    df.groupBy("col1").agg(*exprs).show()
    
  • Scala

    import org.apache.spark.sql.functions.sum
    
    val exprs = df.columns.map(sum(_))
    df.groupBy($"col1").agg(exprs.head, exprs.tail: _*)
    

Es gibt andere Möglichkeiten, um einen ähnlichen Effekt zu erzielen, aber diese sollten die meiste Zeit mehr als ausreichend sein.

Siehe auch:

null323
quelle
Es scheint aggregateByhier anwendbar zu sein. Es ist schneller (zu viel schneller) als groupBy. Oh warte - das DataFramemacht nicht aus aggregateBy- aggwird darauf hingewiesen groupBy. Nun, das heißt, DataFramessind langsam ..
StephenBoesch
2
@javadba Nein, es bedeutet nur, dass Dataset.groupBy/ Dataset.groupByKeyund RDD.groupBy/ RDD.groupByKey im Allgemeinen unterschiedliche Semantiken haben. Überprüfen Sie dies bei einfachen DataFrameAggregationen . Das ist mehr, aber es ist hier nicht wichtig.
Null 323
3
@ Javadba Danke. Hier ist eine weitere nützliche Ressource (subjektiv, Eigenwerbung): git.io/vM1Ch
zero323
7
Wie füge ich den Spalten einen Alias ​​hinzu?
GeekFactory
4
@ GeekFactoryexprs = [min(x).alias("{0}".format(x)) for x in df.columns]
zero323
21

Ein weiteres Beispiel für dasselbe Konzept - aber sagen wir - Sie haben zwei verschiedene Spalten - und Sie möchten auf jede von ihnen unterschiedliche Agg-Funktionen anwenden, d. H.

f.groupBy("col1").agg(sum("col2").alias("col2"), avg("col3").alias("col3"), ...)

Hier ist der Weg, um dies zu erreichen - obwohl ich noch nicht weiß, wie ich den Alias ​​in diesem Fall hinzufügen soll

Siehe das folgende Beispiel - Verwenden von Karten

val Claim1 = StructType(Seq(StructField("pid", StringType, true),StructField("diag1", StringType, true),StructField("diag2", StringType, true), StructField("allowed", IntegerType, true), StructField("allowed1", IntegerType, true)))
val claimsData1 = Seq(("PID1", "diag1", "diag2", 100, 200), ("PID1", "diag2", "diag3", 300, 600), ("PID1", "diag1", "diag5", 340, 680), ("PID2", "diag3", "diag4", 245, 490), ("PID2", "diag2", "diag1", 124, 248))

val claimRDD1 = sc.parallelize(claimsData1)
val claimRDDRow1 = claimRDD1.map(p => Row(p._1, p._2, p._3, p._4, p._5))
val claimRDD2DF1 = sqlContext.createDataFrame(claimRDDRow1, Claim1)

val l = List("allowed", "allowed1")
val exprs = l.map((_ -> "sum")).toMap
claimRDD2DF1.groupBy("pid").agg(exprs) show false
val exprs = Map("allowed" -> "sum", "allowed1" -> "avg")

claimRDD2DF1.groupBy("pid").agg(exprs) show false
Sumit Pal
quelle
1

Aktuelle Antworten zum Erstellen der Aggregationen sind vollkommen korrekt, aber keine adressiert tatsächlich den Spaltenalias / die Umbenennung, der / die ebenfalls in der Frage angefordert wird.

Normalerweise gehe ich so mit diesem Fall um:

val dimensionFields = List("col1")
val metrics = List("col2", "col3", "col4")
val columnOfInterests = dimensions ++ metrics

val df = spark.read.table("some_table"). 
    .select(columnOfInterests.map(c => col(c)):_*)
    .groupBy(dimensions.map(d => col(d)): _*)
    .agg(metrics.map( m => m -> "sum").toMap)
    .toDF(columnOfInterests:_*)    // that's the interesting part

In der letzten Zeile werden im Wesentlichen alle Spalten des aggregierten Datenrahmens in die ursprünglichen Felder umbenannt, wobei im Wesentlichen geändert wird sum(col2)und sum(col3)einfach col2und col3.

Philippe Oger
quelle