Schreiben Sie eine einzelne CSV-Datei mit spark-csv

108

Ich verwende https://github.com/databricks/spark-csv . Ich versuche, eine einzelne CSV zu schreiben, kann dies jedoch nicht. Es wird ein Ordner erstellt.

Benötigen Sie eine Scala-Funktion, die Parameter wie Pfad und Dateiname verwendet und diese CSV-Datei schreibt.

user1735076
quelle

Antworten:

167

Es wird ein Ordner mit mehreren Dateien erstellt, da jede Partition einzeln gespeichert wird. Wenn Sie eine einzelne Ausgabedatei benötigen (die sich noch in einem Ordner befindet), können Sie dies tun repartition(bevorzugt, wenn die Upstream-Daten groß sind, aber eine Zufallswiedergabe erforderlich sind):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

oder coalesce:

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

Datenrahmen vor dem Speichern:

Alle Daten werden in geschrieben mydata.csv/part-00000. Bevor Sie diese Option verwenden , stellen Sie sicher, dass Sie wissen, was los ist und wie hoch die Kosten für die Übertragung aller Daten an einen einzelnen Mitarbeiter sind . Wenn Sie ein verteiltes Dateisystem mit Replikation verwenden, werden Daten mehrmals übertragen - zuerst an einen einzelnen Worker abgerufen und anschließend auf Speicherknoten verteilt.

Alternativ können Sie Ihren Code unverändert lassen und allgemeine Tools wie catoder HDFS verwenden,getmerge um anschließend einfach alle Teile zusammenzuführen.

null323
quelle
6
Sie können coalesce auch verwenden: df.coalesce (1) .write.format ("com.databricks.spark.csv") .option ("header", "true") .save ("mydata.csv")
ravi
spark 1.6 .coalesce(1)löst einen Fehler aus, wenn wir festlegen , dass eine FileNotFoundException im Verzeichnis _temporary vorliegt. Es ist immer noch ein Fehler im Funken: Issues.apache.org/jira/browse/SPARK-2984
Harsha
@ Harsha unwahrscheinlich. Eher ein einfaches Ergebnis, coalesce(1)weil es sehr teuer und normalerweise nicht praktisch ist.
Null323
Einverstanden @ zero323, aber wenn Sie eine spezielle Anforderung zur Konsolidierung in einer Datei haben, sollte dies dennoch möglich sein, da Sie über ausreichende Ressourcen und Zeit verfügen.
Harsha
2
@ Harsha Ich sage nicht, dass es keine gibt. Wenn Sie GC richtig einstellen, sollte es gut funktionieren, aber es ist einfach Zeitverschwendung und wird höchstwahrscheinlich die Gesamtleistung beeinträchtigen. Ich persönlich sehe keinen Grund, mich darum zu kümmern, zumal es trivial einfach ist, Dateien außerhalb von Spark zusammenzuführen, ohne sich um die Speichernutzung zu kümmern.
Null323
36

Wenn Sie Spark mit HDFS ausführen, habe ich das Problem gelöst, indem ich CSV-Dateien normal geschrieben und HDFS für das Zusammenführen verwendet habe. Ich mache das direkt in Spark (1.6):

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

Ich kann mich nicht erinnern, wo ich diesen Trick gelernt habe, aber er könnte für Sie funktionieren.

Minkymorgan
quelle
Ich habe es nicht versucht - und vermute, dass es nicht einfach ist.
Minkymorgan
1
Vielen Dank. Ich habe eine Antwort hinzugefügt , die auf Databricks funktioniert
Josiah Yoder
@Minkymorgan Ich habe ein ähnliches Problem, kann es aber nicht richtig machen. Kannst du dir bitte diese Frage ansehen stackoverflow.com/questions/46812388/…
SUDARSHAN
4
@SUDARSHAN Meine obige Funktion funktioniert mit unkomprimierten Daten. In Ihrem Beispiel verwenden Sie die GZIP-Komprimierung, wenn Sie Dateien schreiben - und anschließend versuchen, diese zusammenzuführen, was fehlschlägt. Das wird nicht funktionieren, da Sie keine gzip-Dateien zusammenführen können. Gzip ist kein Splittable Compression-Algorithmus, also sicherlich nicht "zusammenführbar". Sie könnten die Komprimierung "bissig" oder "bz2" testen - aber das Bauchgefühl ist, dass dies auch beim Zusammenführen fehlschlägt. Am besten entfernen Sie wahrscheinlich die Komprimierung, führen Rohdateien zusammen und komprimieren sie dann mit einem aufteilbaren Codec.
Minkymorgan
und was ist, wenn ich den Header beibehalten möchte? es dupliziert für jeden Dateiteil
Normal
32

Ich bin vielleicht etwas spät dran, aber mit coalesce(1)oderrepartition(1) wenn kleine Datensätze für sie arbeite, werden große Datensätze alle in eine Partition auf einem Knoten geworfen. Dies führt wahrscheinlich zu OOM-Fehlern oder bestenfalls zu einer langsamen Verarbeitung.

Ich würde empfehlen, dass Sie die verwenden FileUtil.copyMerge() Funktion von der Hadoop-API verwenden. Dadurch werden die Ausgaben in einer einzigen Datei zusammengeführt.

BEARBEITEN - Dies bringt die Daten effektiv zum Treiber und nicht zu einem Executor-Knoten.Coalesce()wäre in Ordnung, wenn ein einzelner Executor mehr RAM zur Verfügung hätte als der Treiber.

EDIT 2 : copyMerge()wird in Hadoop 3.0 entfernt. Weitere Informationen zum Arbeiten mit der neuesten Version finden Sie im folgenden Artikel zum Stapelüberlauf: Wie wird CopyMerge in Hadoop 3.0 ausgeführt?

etspaceman
quelle
Irgendwelche Gedanken darüber, wie man auf diese Weise eine CSV mit einer Kopfzeile erhält? Ich möchte nicht, dass die Datei einen Header erzeugt, da dies Header in der gesamten Datei verteilt, einen für jede Partition.
nojo
Es gibt eine Option, die ich in der Vergangenheit verwendet habe und die hier dokumentiert ist: markhneedham.com/blog/2014/11/30/…
etspaceman
@etspaceman Cool. Ich habe leider immer noch keine gute Möglichkeit, dies zu tun, da ich dies in Java (oder Spark) tun muss, aber auf eine Weise, die nicht viel Speicher verbraucht und mit großen Dateien arbeiten kann. . Ich kann immer noch nicht glauben, dass sie diesen API-Aufruf entfernt haben ... dies ist eine sehr häufige Verwendung, auch wenn sie nicht genau von anderen Anwendungen im Hadoop-Ökosystem verwendet wird.
Woot
20

Wenn Sie Databricks verwenden und alle Daten auf einem Worker in den Arbeitsspeicher einpassen können (und somit verwenden können .coalesce(1)), können Sie mit dbfs die resultierende CSV-Datei suchen und verschieben:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

Wenn Ihre Datei nicht in den Arbeitsspeicher des Workers passt, sollten Sie den Vorschlag von chaotic3quilibrium in Betracht ziehen, FileUtils.copyMerge () zu verwenden. . Ich habe dies nicht getan und weiß noch nicht, ob dies möglich ist oder nicht, z. B. auf S3.

Diese Antwort basiert auf früheren Antworten auf diese Frage sowie meinen eigenen Tests des bereitgestellten Code-Snippets. Ich habe es ursprünglich bei Databricks gepostet und veröffentliche es hier erneut.

Die beste Dokumentation für die rekursive Option von dbfs 'rm, die ich gefunden habe, befindet sich in einem Databricks-Forum .

Josiah Yoder
quelle
3

Eine Lösung, die für S3 funktioniert und von Minkymorgan modifiziert wurde.

Übergeben Sie einfach den temporären partitionierten Verzeichnispfad (mit einem anderen Namen als dem endgültigen Pfad) als srcPathund den einzelnen endgültigen csv / txt als destPath auch angebendeleteSource Sie Sie ob Sie das ursprüngliche Verzeichnis entfernen möchten.

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )
}
John Zhu
quelle
Die copyMerge-Implementierung listet alle Dateien auf und iteriert darüber. Dies ist in s3 nicht sicher. Wenn Sie Ihre Dateien schreiben und dann auflisten, kann dies nicht garantieren, dass alle aufgelistet werden. siehe [dies | docs.aws.amazon.com/AmazonS3/latest/dev/…
LiranBo
3

Die Spark- df.write()API erstellt mehrere Teiledateien innerhalb des angegebenen Pfads ... um zu erzwingen, dass Spark nur eine einzelne Teiledatei verwendet, df.coalesce(1).write.csv(...)anstatt dass df.repartition(1).write.csv(...)Coalesce eine enge Transformation ist, während Repartition eine umfassende Transformation ist, siehe Spark - repartition () vs coalesce ()

df.coalesce(1).write.csv(filepath,header=True) 

erstellt einen Ordner im angegebenen Dateipfad mit einer part-0001-...-c000.csvDateiverwendung

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 

einen benutzerfreundlichen Dateinamen haben

pprasad009
quelle
Wenn der Datenrahmen nicht zu groß ist (~ GBs oder in den Treiberspeicher passen), können Sie auch eine df.toPandas().to_csv(path)einzelne CSV mit Ihrem bevorzugten Dateinamen schreiben
pprasad009
Ugh, so frustrierend, wie dies nur durch die Umstellung auf Pandas erreicht werden kann. Wie schwer ist es, eine Datei ohne UUID zu schreiben?
Ijoseph
2

vor dem Speichern auf 1 Partition neu partitionieren / zusammenführen (Sie würden immer noch einen Ordner erhalten, der jedoch eine Teiledatei enthalten würde)

Arnon Rotem-Gal-Oz
quelle
2

Sie können verwenden rdd.coalesce(1, true).saveAsTextFile(path)

Es speichert Daten als einzelne Datei in Pfad / Teil-00000

Gourav
quelle
1
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._

Ich habe mit dem folgenden Ansatz gelöst (hdfs Dateiname umbenennen): -

Schritt 1: - (Crate Data Frame und Schreiben in HDFS)

df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")

Schritt 2: - (Hadoop-Konfiguration erstellen)

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

Schritt 3: - (Pfad im HDFS-Ordnerpfad abrufen)

val pathFiles = new Path("/hdfsfolder/blah/")

Schritt 4: - (Spark-Dateinamen aus dem HDFS-Ordner abrufen)

val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)

setp5: - (Erstellen Sie eine veränderbare Scala-Liste, um alle Dateinamen zu speichern und zur Liste hinzuzufügen.)

    var fileNamesList = scala.collection.mutable.MutableList[String]()
    while (fileNames.hasNext) {
      fileNamesList += fileNames.next().getPath.getName
    }
    println(fileNamesList)

Schritt 6: - (Filter _SUCESS Dateireihenfolge aus Dateinamen Scala Liste)

    // get files name which are not _SUCCESS
    val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")

Schritt 7: - (Scala-Liste in Zeichenfolge konvertieren und gewünschten Dateinamen zur Zeichenfolge des HDFS-Ordners hinzufügen und dann umbenennen)

val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
    val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
    hdfs.rename(partFileSourcePath , desiredCsvTargetPath)
sri hari kali charan Tummala
quelle
1

Diese Antwort erweitert die akzeptierte Antwort, bietet mehr Kontext und Code-Snippets, die Sie in der Spark-Shell auf Ihrem Computer ausführen können.

Mehr Kontext zur akzeptierten Antwort

Die akzeptierte Antwort könnte den Eindruck erwecken, dass der Beispielcode eine einzelne mydata.csvDatei ausgibt, und das ist nicht der Fall. Lassen Sie uns demonstrieren:

val df = Seq("one", "two", "three").toDF("num")
df
  .repartition(1)
  .write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")

Folgendes wird ausgegeben:

Documents/
  tmp/
    mydata.csv/
      _SUCCESS
      part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv

NB mydata.csvist ein Ordner in der akzeptierten Antwort - es ist keine Datei!

So geben Sie eine einzelne Datei mit einem bestimmten Namen aus

Wir können spark-daria verwenden , um eine einzelne mydata.csvDatei zu schreiben .

import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = sys.env("HOME") + "/Documents/better/staging",
    filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)

Dadurch wird die Datei wie folgt ausgegeben:

Documents/
  better/
    mydata.csv

S3-Pfade

Sie müssen s3a-Pfade übergeben DariaWriters.writeSingleFile, um diese Methode in S3 verwenden zu können:

DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = "s3a://bucket/data/src",
    filename = "s3a://bucket/data/dest/my_cool_file.csv"
)

Siehe hier für weitere Informationen.

CopyMerge vermeiden

copyMerge wurde aus Hadoop 3 entfernt. Die DariaWriters.writeSingleFileImplementierung verwendet fs.rename, wie hier beschrieben . Spark 3 verwendete immer noch Hadoop 2 , sodass CopyMerge-Implementierungen im Jahr 2020 funktionieren werden. Ich bin nicht sicher, wann Spark auf Hadoop 3 aktualisiert wird, aber es ist besser, jeden copyMerge-Ansatz zu vermeiden, der dazu führt, dass Ihr Code beim Upgrade von Spark Hadoop beschädigt wird.

Quellcode

Suchen Sie DariaWritersim Spark-Daria-Quellcode nach dem Objekt, wenn Sie die Implementierung überprüfen möchten.

PySpark-Implementierung

Mit PySpark ist es einfacher, eine einzelne Datei zu schreiben, da Sie den DataFrame in einen Pandas DataFrame konvertieren können, der standardmäßig als einzelne Datei geschrieben wird.

from pathlib import Path
home = str(Path.home())
data = [
    ("jellyfish", "JALYF"),
    ("li", "L"),
    ("luisa", "LAS"),
    (None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)

Einschränkungen

Der DariaWriters.writeSingleFileScala-Ansatz und der df.toPandas()Python-Ansatz funktionieren nur für kleine Datensätze. Riesige Datensätze können nicht als einzelne Dateien ausgeschrieben werden. Das Schreiben von Daten als einzelne Datei ist aus Sicht der Leistung nicht optimal, da die Daten nicht parallel geschrieben werden können.

Befugnisse
quelle
0

Ich verwende dies in Python, um eine einzelne Datei zu erhalten:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)
Kees C. Bakker
quelle
0

Mit Listbuffer können wir Daten in einer einzigen Datei speichern:

import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
    val text = spark.read.textFile("filepath")
    var data = ListBuffer[String]()
    for(line:String <- text.collect()){
      data += line
    }
    val writer = new FileWriter("filepath")
    data.foreach(line => writer.write(line.toString+"\n"))
    writer.close()
Siddhu Salvi
quelle
-2

Es gibt noch eine Möglichkeit, Java zu verwenden

import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
  {
     val p = new java.io.PrintWriter(f);  
     try { op(p) } 
     finally { p.close() }
  } 

printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}
Sergio Aljoschkin
quelle
Name 'wahr' ist nicht definiert
Arron