Es wird ein Ordner mit mehreren Dateien erstellt, da jede Partition einzeln gespeichert wird. Wenn Sie eine einzelne Ausgabedatei benötigen (die sich noch in einem Ordner befindet), können Sie dies tun repartition
(bevorzugt, wenn die Upstream-Daten groß sind, aber eine Zufallswiedergabe erforderlich sind):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
oder coalesce
:
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
Datenrahmen vor dem Speichern:
Alle Daten werden in geschrieben mydata.csv/part-00000
. Bevor Sie diese Option verwenden , stellen Sie sicher, dass Sie wissen, was los ist und wie hoch die Kosten für die Übertragung aller Daten an einen einzelnen Mitarbeiter sind . Wenn Sie ein verteiltes Dateisystem mit Replikation verwenden, werden Daten mehrmals übertragen - zuerst an einen einzelnen Worker abgerufen und anschließend auf Speicherknoten verteilt.
Alternativ können Sie Ihren Code unverändert lassen und allgemeine Tools wie cat
oder HDFS verwenden,getmerge
um anschließend einfach alle Teile zusammenzuführen.
.coalesce(1)
löst einen Fehler aus, wenn wir festlegen , dass eine FileNotFoundException im Verzeichnis _temporary vorliegt. Es ist immer noch ein Fehler im Funken: Issues.apache.org/jira/browse/SPARK-2984coalesce(1)
weil es sehr teuer und normalerweise nicht praktisch ist.Wenn Sie Spark mit HDFS ausführen, habe ich das Problem gelöst, indem ich CSV-Dateien normal geschrieben und HDFS für das Zusammenführen verwendet habe. Ich mache das direkt in Spark (1.6):
Ich kann mich nicht erinnern, wo ich diesen Trick gelernt habe, aber er könnte für Sie funktionieren.
quelle
Ich bin vielleicht etwas spät dran, aber mit
coalesce(1)
oderrepartition(1)
wenn kleine Datensätze für sie arbeite, werden große Datensätze alle in eine Partition auf einem Knoten geworfen. Dies führt wahrscheinlich zu OOM-Fehlern oder bestenfalls zu einer langsamen Verarbeitung.Ich würde empfehlen, dass Sie die verwenden
FileUtil.copyMerge()
Funktion von der Hadoop-API verwenden. Dadurch werden die Ausgaben in einer einzigen Datei zusammengeführt.BEARBEITEN - Dies bringt die Daten effektiv zum Treiber und nicht zu einem Executor-Knoten.
Coalesce()
wäre in Ordnung, wenn ein einzelner Executor mehr RAM zur Verfügung hätte als der Treiber.EDIT 2 :
copyMerge()
wird in Hadoop 3.0 entfernt. Weitere Informationen zum Arbeiten mit der neuesten Version finden Sie im folgenden Artikel zum Stapelüberlauf: Wie wird CopyMerge in Hadoop 3.0 ausgeführt?quelle
Wenn Sie Databricks verwenden und alle Daten auf einem Worker in den Arbeitsspeicher einpassen können (und somit verwenden können
.coalesce(1)
), können Sie mit dbfs die resultierende CSV-Datei suchen und verschieben:Wenn Ihre Datei nicht in den Arbeitsspeicher des Workers passt, sollten Sie den Vorschlag von chaotic3quilibrium in Betracht ziehen, FileUtils.copyMerge () zu verwenden. . Ich habe dies nicht getan und weiß noch nicht, ob dies möglich ist oder nicht, z. B. auf S3.
Diese Antwort basiert auf früheren Antworten auf diese Frage sowie meinen eigenen Tests des bereitgestellten Code-Snippets. Ich habe es ursprünglich bei Databricks gepostet und veröffentliche es hier erneut.
Die beste Dokumentation für die rekursive Option von dbfs 'rm, die ich gefunden habe, befindet sich in einem Databricks-Forum .
quelle
Eine Lösung, die für S3 funktioniert und von Minkymorgan modifiziert wurde.
Übergeben Sie einfach den temporären partitionierten Verzeichnispfad (mit einem anderen Namen als dem endgültigen Pfad) als
srcPath
und den einzelnen endgültigen csv / txt alsdestPath
auch angebendeleteSource
Sie Sie ob Sie das ursprüngliche Verzeichnis entfernen möchten.quelle
Die Spark-
df.write()
API erstellt mehrere Teiledateien innerhalb des angegebenen Pfads ... um zu erzwingen, dass Spark nur eine einzelne Teiledatei verwendet,df.coalesce(1).write.csv(...)
anstatt dassdf.repartition(1).write.csv(...)
Coalesce eine enge Transformation ist, während Repartition eine umfassende Transformation ist, siehe Spark - repartition () vs coalesce ()erstellt einen Ordner im angegebenen Dateipfad mit einer
part-0001-...-c000.csv
Dateiverwendungeinen benutzerfreundlichen Dateinamen haben
quelle
df.toPandas().to_csv(path)
einzelne CSV mit Ihrem bevorzugten Dateinamen schreibenvor dem Speichern auf 1 Partition neu partitionieren / zusammenführen (Sie würden immer noch einen Ordner erhalten, der jedoch eine Teiledatei enthalten würde)
quelle
Sie können verwenden
rdd.coalesce(1, true).saveAsTextFile(path)
Es speichert Daten als einzelne Datei in Pfad / Teil-00000
quelle
Ich habe mit dem folgenden Ansatz gelöst (hdfs Dateiname umbenennen): -
Schritt 1: - (Crate Data Frame und Schreiben in HDFS)
Schritt 2: - (Hadoop-Konfiguration erstellen)
Schritt 3: - (Pfad im HDFS-Ordnerpfad abrufen)
Schritt 4: - (Spark-Dateinamen aus dem HDFS-Ordner abrufen)
setp5: - (Erstellen Sie eine veränderbare Scala-Liste, um alle Dateinamen zu speichern und zur Liste hinzuzufügen.)
Schritt 6: - (Filter _SUCESS Dateireihenfolge aus Dateinamen Scala Liste)
Schritt 7: - (Scala-Liste in Zeichenfolge konvertieren und gewünschten Dateinamen zur Zeichenfolge des HDFS-Ordners hinzufügen und dann umbenennen)
quelle
Diese Antwort erweitert die akzeptierte Antwort, bietet mehr Kontext und Code-Snippets, die Sie in der Spark-Shell auf Ihrem Computer ausführen können.
Mehr Kontext zur akzeptierten Antwort
Die akzeptierte Antwort könnte den Eindruck erwecken, dass der Beispielcode eine einzelne
mydata.csv
Datei ausgibt, und das ist nicht der Fall. Lassen Sie uns demonstrieren:Folgendes wird ausgegeben:
NB
mydata.csv
ist ein Ordner in der akzeptierten Antwort - es ist keine Datei!So geben Sie eine einzelne Datei mit einem bestimmten Namen aus
Wir können spark-daria verwenden , um eine einzelne
mydata.csv
Datei zu schreiben .Dadurch wird die Datei wie folgt ausgegeben:
S3-Pfade
Sie müssen s3a-Pfade übergeben
DariaWriters.writeSingleFile
, um diese Methode in S3 verwenden zu können:Siehe hier für weitere Informationen.
CopyMerge vermeiden
copyMerge wurde aus Hadoop 3 entfernt. Die
DariaWriters.writeSingleFile
Implementierung verwendetfs.rename
, wie hier beschrieben . Spark 3 verwendete immer noch Hadoop 2 , sodass CopyMerge-Implementierungen im Jahr 2020 funktionieren werden. Ich bin nicht sicher, wann Spark auf Hadoop 3 aktualisiert wird, aber es ist besser, jeden copyMerge-Ansatz zu vermeiden, der dazu führt, dass Ihr Code beim Upgrade von Spark Hadoop beschädigt wird.Quellcode
Suchen Sie
DariaWriters
im Spark-Daria-Quellcode nach dem Objekt, wenn Sie die Implementierung überprüfen möchten.PySpark-Implementierung
Mit PySpark ist es einfacher, eine einzelne Datei zu schreiben, da Sie den DataFrame in einen Pandas DataFrame konvertieren können, der standardmäßig als einzelne Datei geschrieben wird.
Einschränkungen
Der
DariaWriters.writeSingleFile
Scala-Ansatz und derdf.toPandas()
Python-Ansatz funktionieren nur für kleine Datensätze. Riesige Datensätze können nicht als einzelne Dateien ausgeschrieben werden. Das Schreiben von Daten als einzelne Datei ist aus Sicht der Leistung nicht optimal, da die Daten nicht parallel geschrieben werden können.quelle
Ich verwende dies in Python, um eine einzelne Datei zu erhalten:
quelle
Mit Listbuffer können wir Daten in einer einzigen Datei speichern:
quelle
Es gibt noch eine Möglichkeit, Java zu verwenden
quelle