Ich arbeite an einem Datenrahmen mit zwei Spalten, mvv und count.
+---+-----+
|mvv|count|
+---+-----+
| 1 | 5 |
| 2 | 9 |
| 3 | 3 |
| 4 | 1 |
Ich möchte zwei Listen mit MVV-Werten und Zählwert erhalten. Etwas wie
mvv = [1,2,3,4]
count = [5,9,3,1]
Also habe ich den folgenden Code ausprobiert: Die erste Zeile sollte eine Python-Zeilenliste zurückgeben. Ich wollte den ersten Wert sehen:
mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)
Aber ich bekomme eine Fehlermeldung mit der zweiten Zeile:
AttributeError: getInt
python
apache-spark
pyspark
spark-dataframe
a.moussa
quelle
quelle
list(df.select('mvv').toPandas()['mvv'])
. Arrow wurde in PySpark integriert, was sichtoPandas
erheblich beschleunigte . Verwenden Sie nicht die anderen Ansätze, wenn Sie Spark 2.3+ verwenden. Weitere Benchmarking-Details finden Sie in meiner Antwort.Antworten:
Sehen Sie, warum diese Art und Weise, wie Sie es tun, nicht funktioniert. Zunächst versuchen Sie, eine Ganzzahl aus einem Zeilentyp abzurufen. Die Ausgabe Ihrer Sammlung sieht folgendermaßen aus:
Wenn Sie so etwas nehmen:
Sie erhalten den
mvv
Wert. Wenn Sie alle Informationen des Arrays möchten, können Sie Folgendes tun:Wenn Sie jedoch dasselbe für die andere Spalte versuchen, erhalten Sie:
Dies geschieht, weil
count
es sich um eine integrierte Methode handelt. Und die Spalte hat den gleichen Namen wiecount
. Eine Problemumgehung hierfür ist das Ändern des Spaltennamenscount
in_count
:Diese Problemumgehung ist jedoch nicht erforderlich, da Sie über die Wörterbuchsyntax auf die Spalte zugreifen können:
Und es wird endlich funktionieren!
quelle
select('count')
Verwendung nicht wiecount_list = [int(i.count) for i in mvv_list.collect()]
folgt hinzufügen : Ich werde das Beispiel zur Antwort hinzufügen.[i.['count'] for i in mvv_list.collect()]
arbeitet, um es explizit zu machen, die Spalte 'count' und nicht diecount
Funktion zu verwendenWenn Sie einem Liner folgen, erhalten Sie die gewünschte Liste.
quelle
Dadurch erhalten Sie alle Elemente als Liste.
quelle
Der folgende Code hilft Ihnen dabei
quelle
Für meine Daten habe ich folgende Benchmarks erhalten:
0,52 Sek
0,271 Sek
0,427 Sek
Das Ergebnis ist das gleiche
quelle
toLocalIterator
stattdessen verwendencollect
, sollte es sogar speichereffizienter sein[row[col] for row in data.toLocalIterator()]
Wenn Sie den folgenden Fehler erhalten:
Dieser Code löst Ihre Probleme:
quelle
Ich habe eine Benchmarking-Analyse durchgeführt und bin
list(mvv_count_df.select('mvv').toPandas()['mvv'])
die schnellste Methode. Ich bin sehr überrascht.Ich habe die verschiedenen Ansätze für 100.000 / 100 Millionen Zeilendatensätze mit einem i3.xlarge-Cluster mit 5 Knoten (jeder Knoten verfügt über 30,5 GB RAM und 4 Kerne) mit Spark 2.4.5 ausgeführt. Die Daten wurden gleichmäßig auf 20 bissig komprimierte Parkettdateien mit einer einzigen Spalte verteilt.
Hier sind die Benchmarking-Ergebnisse (Laufzeiten in Sekunden):
Goldene Regeln beim Sammeln von Daten auf dem Treiberknoten:
toPandas
wurde in Spark 2.3 signifikant verbessert . Dies ist wahrscheinlich nicht der beste Ansatz, wenn Sie eine Spark-Version vor 2.3 verwenden.Sehen Sie hier für weitere Informationen / Benchmarking Ergebnisse.
quelle
Eine mögliche Lösung ist die Verwendung der
collect_list()
Funktion vonpyspark.sql.functions
. Dadurch werden alle Spaltenwerte zu einem Pyspark-Array zusammengefasst, das beim Sammeln in eine Python-Liste konvertiert wird:quelle