Wenn Sie Scala in Spark verwenden und die Ergebnisse mit verwenden saveAsTextFile
, scheint die Ausgabe in mehrere Teile aufgeteilt zu werden. Ich übergebe nur einen Parameter (Pfad).
val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
- Entspricht die Anzahl der Ausgänge der Anzahl der verwendeten Reduzierungen?
- Bedeutet dies, dass die Ausgabe komprimiert ist?
- Ich weiß, dass ich die Ausgabe mit Bash kombinieren kann, aber gibt es eine Option, um die Ausgabe in einer einzelnen Textdatei zu speichern, ohne sie zu teilen? Ich habe mir die API-Dokumente angesehen, aber es sagt nicht viel darüber aus.
scala
apache-spark
user2773013
quelle
quelle
Antworten:
Der Grund, warum es als mehrere Dateien gespeichert wird, liegt darin, dass die Berechnung verteilt wird. Wenn die Ausgabe so klein ist, dass Sie glauben, sie auf einem Computer installieren zu können, können Sie Ihr Programm mit beenden
val arr = year.collect()
Speichern Sie dann das resultierende Array als Datei. Eine andere Möglichkeit wäre die Verwendung eines benutzerdefinierten Partitionierers.
partitionBy
und ihn so zu gestalten, dass alles auf eine Partition übertragen wird. Dies ist jedoch nicht ratsam, da Sie keine Parallelisierung erhalten.Wenn Sie möchten, dass die Datei mit gespeichert wird
saveAsTextFile
, können Sie verwendencoalesce(1,true).saveAsTextFile()
. Dies bedeutet im Grunde, dass die Berechnung dann zu einer Partition zusammengeführt wird. Sie können auchrepartition(1)
einen Wrapper verwenden, für dencoalesce
das Shuffle-Argument auf true gesetzt ist. Wenn ich mir die Quelle von RDD.scala anschaue, habe ich das meiste herausgefunden. Sie sollten einen Blick darauf werfen.quelle
coalesce
oder derpartition
Ansatz, den ich vorgeschlagen habe, aber es macht wirklich keinen Sinn, auf HDFS zu speichern, wenn es nur auf einem Knoten ist, weshalb die Verwendung vonFür diejenigen, die mit einem größeren Datensatz arbeiten :
rdd.collect()
sollte in diesem Fall nicht verwendet werden , da alle DatenArray
im Treiber erfasst werden. Dies ist der einfachste Weg, um aus dem Speicher herauszukommen.rdd.coalesce(1).saveAsTextFile()
sollte auch nicht verwendet werden, da die Parallelität von Upstream-Stufen verloren geht und auf einem einzelnen Knoten ausgeführt wird, von dem aus Daten gespeichert werden.rdd.coalesce(1, shuffle = true).saveAsTextFile()
ist die beste einfache Option, da die Verarbeitung von Upstream-Aufgaben parallel bleibt und dann nur das Mischen zu einem Knoten durchgeführt wird (rdd.repartition(1).saveAsTextFile()
ist ein genaues Synonym).rdd.saveAsSingleTextFile()
Wie unten angegeben, können Sie die Festplatte zusätzlich in einer einzelnen Datei mit einem bestimmten Namen speichern, wobei die Parallelitätseigenschaften von beibehalten werdenrdd.coalesce(1, shuffle = true).saveAsTextFile()
.Etwas, das unpraktisch sein kann,
rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt")
ist, dass es tatsächlich eine Datei erzeugt, deren Pfad istpath/to/file.txt/part-00000
und nichtpath/to/file.txt
.Die folgende Lösung erzeugt
rdd.saveAsSingleTextFile("path/to/file.txt")
tatsächlich eine Datei mit dem Pfadpath/to/file.txt
:package com.whatever.package import org.apache.spark.rdd.RDD import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.io.compress.CompressionCodec object SparkHelper { // This is an implicit class so that saveAsSingleTextFile can be attached to // SparkContext and be called like this: sc.saveAsSingleTextFile implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal { def saveAsSingleTextFile(path: String): Unit = saveAsSingleTextFileInternal(path, None) def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = saveAsSingleTextFileInternal(path, Some(codec)) private def saveAsSingleTextFileInternal( path: String, codec: Option[Class[_ <: CompressionCodec]] ): Unit = { // The interface with hdfs: val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration) // Classic saveAsTextFile in a temporary folder: hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already codec match { case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec) case None => rdd.saveAsTextFile(s"$path.tmp") } // Merge the folder of resulting part-xxxxx into one file: hdfs.delete(new Path(path), true) // to make sure it's not there already FileUtil.copyMerge( hdfs, new Path(s"$path.tmp"), hdfs, new Path(path), true, rdd.sparkContext.hadoopConfiguration, null ) // Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144 hdfs.delete(new Path(s"$path.tmp"), true) } } }
die so verwendet werden kann:
import com.whatever.package.SparkHelper.RDDExtensions rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed: import org.apache.hadoop.io.compress.GzipCodec rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])
Dieser Ausschnitt:
Speichert zuerst die Festplatte mit
rdd.saveAsTextFile("path/to/file.txt")
in einem temporären Ordner,path/to/file.txt.tmp
als ob wir keine Daten in einer Datei speichern wollten (wodurch die Verarbeitung von Upstream-Aufgaben parallel bleibt).Und dann fahren wir nur mit der Hadoop-Dateisystem-API mit dem Zusammenführen (
FileUtil.copyMerge()
) der verschiedenen Ausgabedateien fort, um unsere endgültige Ausgabedatei zu erstellenpath/to/file.txt
.quelle
Sie könnten anrufen
coalesce(1)
und dannsaveAsTextFile()
- aber es könnte eine schlechte Idee sein, wenn Sie viele Daten haben. Separate Dateien pro Split werden wie in Hadoop generiert, damit separate Mapper und Reduzierer in verschiedene Dateien schreiben können. Eine einzige Ausgabedatei ist nur dann eine gute Idee, wenn Sie nur über sehr wenige Daten verfügen. In diesem Fall könnten Sie auch collect () ausführen, wie @aaronman sagte.quelle
coalesce
sauberer zu sein, als mit dem Partitionierercollect
coalesce(1)
, es sei denn, Sie wissen, was Sie tun .Wie bereits erwähnt, können Sie Ihren Datensatz sammeln oder zusammenführen, um Spark zu zwingen, eine einzelne Datei zu erstellen. Dies begrenzt jedoch auch die Anzahl der Spark-Aufgaben, die parallel an Ihrem Dataset ausgeführt werden können. Ich bevorzuge es, hundert Dateien im Ausgabe-HDFS-Verzeichnis erstellen zu lassen und dann
hadoop fs -getmerge /hdfs/dir /local/file.txt
die Ergebnisse in eine einzelne Datei im lokalen Dateisystem zu extrahieren. Dies ist am sinnvollsten, wenn Ihre Ausgabe natürlich ein relativ kleiner Bericht ist.quelle
Sie können folgendermaßen anrufen
repartition()
und folgen:val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) var repartitioned = year.repartition(1) repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")
quelle
Sie können dies in der nächsten Version von Spark tun. In der aktuellen Version 1.0.0 ist dies nur möglich, wenn Sie es manuell ausführen, z. B. wie bereits erwähnt, mit einem Bash-Skriptaufruf.
quelle
Ich möchte auch erwähnen, dass in der Dokumentation klar angegeben ist, dass Benutzer beim Aufrufen der Koaleszenz mit einer wirklich kleinen Anzahl von Partitionen vorsichtig sein sollten. Dies kann dazu führen, dass Upstream-Partitionen diese Anzahl von Partitionen erben.
Ich würde die Verwendung von Coalesce (1) nur empfehlen, wenn dies wirklich erforderlich ist.
quelle
In Spark 1.6.1 ist das Format wie unten gezeigt. Es wird eine einzelne Ausgabedatei erstellt. Es wird empfohlen, diese zu verwenden, wenn die Ausgabe klein genug ist, um verarbeitet zu werden. Grundsätzlich wird eine neue RDD zurückgegeben, die in numPartitions-Partitionen reduziert wird. Wenn Sie eine drastische Koaleszenz durchführen, zB bei numPartitions = 1 kann dies dazu führen, dass Ihre Berechnung auf weniger Knoten stattfindet, als Sie möchten (z. B. einem Knoten bei numPartitions = 1).
pair_result.coalesce(1).saveAsTextFile("/app/data/")
quelle
Hier ist meine Antwort, um eine einzelne Datei auszugeben. Ich habe gerade hinzugefügt
coalesce(1)
val year = sc.textFile("apat63_99.txt") .map(_.split(",")(1)) .flatMap(_.split(",")) .map((_,1)) .reduceByKey((_+_)).map(_.swap) year.saveAsTextFile("year")
Code:
year.coalesce(1).saveAsTextFile("year")
quelle