Fensterfunktionen :
So etwas sollte den Trick machen:
import org.apache.spark.sql.functions.{row_number, max, broadcast}
import org.apache.spark.sql.expressions.Window
val df = sc.parallelize(Seq(
(0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
(1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
(2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
(3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")
val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc)
val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Diese Methode ist im Falle eines signifikanten Datenversatzes ineffizient.
Einfache SQL-Aggregation, gefolgt vonjoin
:
Alternativ können Sie sich mit einem aggregierten Datenrahmen verbinden:
val dfMax = df.groupBy($"hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))
val dfTopByJoin = df.join(broadcast(dfMax),
($"hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
.drop("max_hour")
.drop("max_value")
dfTopByJoin.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Es werden doppelte Werte beibehalten (wenn es mehr als eine Kategorie pro Stunde mit demselben Gesamtwert gibt). Sie können diese wie folgt entfernen:
dfTopByJoin
.groupBy($"hour")
.agg(
first("category").alias("category"),
first("TotalValue").alias("TotalValue"))
Verwenden der Bestellung überstructs
:
Ordentlicher, wenn auch nicht sehr gut getesteter Trick, der keine Verknüpfungen oder Fensterfunktionen erfordert:
val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
.groupBy($"hour")
.agg(max("vs").alias("vs"))
.select($"Hour", $"vs.Category", $"vs.TotalValue")
dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Mit DataSet API (Spark 1.6+, 2.0+):
Spark 1.6 :
case class Record(Hour: Integer, Category: String, TotalValue: Double)
df.as[Record]
.groupBy($"hour")
.reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)
.show
// +---+--------------+
// | _1| _2|
// +---+--------------+
// |[0]|[0,cat26,30.9]|
// |[1]|[1,cat67,28.5]|
// |[2]|[2,cat56,39.6]|
// |[3]| [3,cat8,35.6]|
// +---+--------------+
Spark 2.0 oder höher :
df.as[Record]
.groupByKey(_.Hour)
.reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)
Die letzten beiden Methoden können die kartenseitige Kombination nutzen und erfordern kein vollständiges Mischen, sodass die meiste Zeit eine bessere Leistung im Vergleich zu Fensterfunktionen und Verknüpfungen erzielt werden sollte. Diese können auch mit Structured Streaming in verwendet werdencompleted
Ausgabemodus verwendet werden.
Verwenden Sie nicht :
df.orderBy(...).groupBy(...).agg(first(...), ...)
Es scheint zu funktionieren (insbesondere im local
Modus), ist aber unzuverlässig (siehe SPARK-16207 , Tzach Zohar für die Verknüpfung des relevanten JIRA-Problems und SPARK-30335) ).
Der gleiche Hinweis gilt für
df.orderBy(...).dropDuplicates(...)
die intern äquivalenten Ausführungsplan verwendet.
Für Spark 2.0.2 mit Gruppierung nach mehreren Spalten:
quelle
Dies ist genau das Gleiche wie die Antwort von zero323 , jedoch in SQL-Abfrage.
Angenommen, der Datenrahmen wird erstellt und registriert als
Fensterfunktion:
Einfache SQL-Aggregation, gefolgt von Join:
Verwenden der Bestellung über Strukturen:
DataSets Weg und nicht tun sind die gleichen wie in der ursprünglichen Antwort
quelle
Das Muster ist nach Schlüsseln gruppiert => mache etwas mit jeder Gruppe, zB reduziere => kehre zum Datenrahmen zurück
Ich fand die Dataframe-Abstraktion in diesem Fall etwas umständlich, daher habe ich die RDD-Funktionalität verwendet
quelle
Die folgende Lösung führt nur eine Gruppe durch und extrahiert die Zeilen Ihres Datenrahmens, die den maxValue enthalten, auf einmal. Keine weiteren Joins oder Windows erforderlich.
quelle
Eine gute Möglichkeit, dies mit der Dataframe-API zu tun, ist die Verwendung der Argmax-Logik
quelle
Hier können Sie so machen -
quelle
Wir können die Funktion des Fensters rank () verwenden (wobei Sie den Rang = 1 wählen würden). Der Rang fügt nur eine Zahl für jede Zeile einer Gruppe hinzu (in diesem Fall wäre es die Stunde).
Hier ist ein Beispiel. (von https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank )
quelle