Wie kann man saveAsTextFile dazu bringen, die Ausgabe NICHT in mehrere Dateien aufzuteilen?

78

Wenn Sie Scala in Spark verwenden und die Ergebnisse mit verwenden saveAsTextFile, scheint die Ausgabe in mehrere Teile aufgeteilt zu werden. Ich übergebe nur einen Parameter (Pfad).

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
  1. Entspricht die Anzahl der Ausgänge der Anzahl der verwendeten Reduzierungen?
  2. Bedeutet dies, dass die Ausgabe komprimiert ist?
  3. Ich weiß, dass ich die Ausgabe mit Bash kombinieren kann, aber gibt es eine Option, um die Ausgabe in einer einzelnen Textdatei zu speichern, ohne sie zu teilen? Ich habe mir die API-Dokumente angesehen, aber es sagt nicht viel darüber aus.
user2773013
quelle
2
Es ist im Allgemeinen eine schlechte Praxis, nur eine Datei in Big Data zu verwenden, wenn diese Datei groß ist.
Samthebest
Was ist dann die beste Vorgehensweise, wenn die Ausgabe beispielsweise eine sortierte Datei war? Behalten Sie es als eine Sammlung von Dateien bei und machen Sie die vielen Namen der Ausgabedateien zu einer Art Index (dh so etwas wie die erste Datei heißt "aa", die mittlere heißt "fg", die letzte "zzy")?
Rdesmond
Es ist häufig der Fall, dass ein Heavy-Spark-Job nur eine sehr kleine Ausgabe (Aggregation, kpis, Popularitäten, ...) generiert, die auf HDFS erzeugt wird, aber höchstwahrscheinlich von Anwendungen verwendet wird, die nicht mit Big Data zusammenhängen. In diesem Fall ist es sauberer und einfacher, eine gut benannte Einzeldatei für Übertragungen und Verbrauch zu haben.
Xavier Guihot

Antworten:

100

Der Grund, warum es als mehrere Dateien gespeichert wird, liegt darin, dass die Berechnung verteilt wird. Wenn die Ausgabe so klein ist, dass Sie glauben, sie auf einem Computer installieren zu können, können Sie Ihr Programm mit beenden

val arr = year.collect()

Speichern Sie dann das resultierende Array als Datei. Eine andere Möglichkeit wäre die Verwendung eines benutzerdefinierten Partitionierers. partitionBy und ihn so zu gestalten, dass alles auf eine Partition übertragen wird. Dies ist jedoch nicht ratsam, da Sie keine Parallelisierung erhalten.

Wenn Sie möchten, dass die Datei mit gespeichert wird saveAsTextFile, können Sie verwenden coalesce(1,true).saveAsTextFile(). Dies bedeutet im Grunde, dass die Berechnung dann zu einer Partition zusammengeführt wird. Sie können auch repartition(1)einen Wrapper verwenden, für den coalescedas Shuffle-Argument auf true gesetzt ist. Wenn ich mir die Quelle von RDD.scala anschaue, habe ich das meiste herausgefunden. Sie sollten einen Blick darauf werfen.

Aaronman
quelle
2
Wie speichert man ein Array als Textdatei? Es gibt keine saveAsTextFile-Funktion für ein Array. nur für RDD.
user2773013
5
@ user2773013 Nun, der Ansatz dafür wäre coalesceoder der partitionAnsatz, den ich vorgeschlagen habe, aber es macht wirklich keinen Sinn, auf HDFS zu speichern, wenn es nur auf einem Knoten ist, weshalb die Verwendung von
Collect
Sehr nützliche Antwort ... Ich habe in den Tutorials, die ich gelesen habe, keine PartitionBy oder Coalesce gesehen ...
35

Für diejenigen, die mit einem größeren Datensatz arbeiten :

  • rdd.collect()sollte in diesem Fall nicht verwendet werden , da alle Daten Arrayim Treiber erfasst werden. Dies ist der einfachste Weg, um aus dem Speicher herauszukommen.

  • rdd.coalesce(1).saveAsTextFile() sollte auch nicht verwendet werden, da die Parallelität von Upstream-Stufen verloren geht und auf einem einzelnen Knoten ausgeführt wird, von dem aus Daten gespeichert werden.

  • rdd.coalesce(1, shuffle = true).saveAsTextFile() ist die beste einfache Option, da die Verarbeitung von Upstream-Aufgaben parallel bleibt und dann nur das Mischen zu einem Knoten durchgeführt wird ( rdd.repartition(1).saveAsTextFile()ist ein genaues Synonym).

  • rdd.saveAsSingleTextFile()Wie unten angegeben, können Sie die Festplatte zusätzlich in einer einzelnen Datei mit einem bestimmten Namen speichern, wobei die Parallelitätseigenschaften von beibehalten werden rdd.coalesce(1, shuffle = true).saveAsTextFile().


Etwas, das unpraktisch sein kann, rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt")ist, dass es tatsächlich eine Datei erzeugt, deren Pfad ist path/to/file.txt/part-00000und nicht path/to/file.txt.

Die folgende Lösung erzeugt rdd.saveAsSingleTextFile("path/to/file.txt")tatsächlich eine Datei mit dem Pfad path/to/file.txt:

package com.whatever.package

import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec

object SparkHelper {

  // This is an implicit class so that saveAsSingleTextFile can be attached to
  // SparkContext and be called like this: sc.saveAsSingleTextFile
  implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {

    def saveAsSingleTextFile(path: String): Unit =
      saveAsSingleTextFileInternal(path, None)

    def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
      saveAsSingleTextFileInternal(path, Some(codec))

    private def saveAsSingleTextFileInternal(
        path: String, codec: Option[Class[_ <: CompressionCodec]]
    ): Unit = {

      // The interface with hdfs:
      val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)

      // Classic saveAsTextFile in a temporary folder:
      hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
      codec match {
        case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
        case None        => rdd.saveAsTextFile(s"$path.tmp")
      }

      // Merge the folder of resulting part-xxxxx into one file:
      hdfs.delete(new Path(path), true) // to make sure it's not there already
      FileUtil.copyMerge(
        hdfs, new Path(s"$path.tmp"),
        hdfs, new Path(path),
        true, rdd.sparkContext.hadoopConfiguration, null
      )
      // Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144

      hdfs.delete(new Path(s"$path.tmp"), true)
    }
  }
}

die so verwendet werden kann:

import com.whatever.package.SparkHelper.RDDExtensions

rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])

Dieser Ausschnitt:

  • Speichert zuerst die Festplatte mit rdd.saveAsTextFile("path/to/file.txt")in einem temporären Ordner, path/to/file.txt.tmpals ob wir keine Daten in einer Datei speichern wollten (wodurch die Verarbeitung von Upstream-Aufgaben parallel bleibt).

  • Und dann fahren wir nur mit der Hadoop-Dateisystem-API mit dem Zusammenführen ( FileUtil.copyMerge()) der verschiedenen Ausgabedateien fort, um unsere endgültige Ausgabedatei zu erstellen path/to/file.txt.

Xavier Guihot
quelle
22

Sie könnten anrufen coalesce(1)und dann saveAsTextFile()- aber es könnte eine schlechte Idee sein, wenn Sie viele Daten haben. Separate Dateien pro Split werden wie in Hadoop generiert, damit separate Mapper und Reduzierer in verschiedene Dateien schreiben können. Eine einzige Ausgabedatei ist nur dann eine gute Idee, wenn Sie nur über sehr wenige Daten verfügen. In diesem Fall könnten Sie auch collect () ausführen, wie @aaronman sagte.

marekinfo
quelle
Nizza dachte nicht daran, coalescesauberer zu sein, als mit dem Partitionierer collect
herumzuspielen. Trotzdem
1
das funktioniert. Wenn Sie jedoch Koaleszenz verwenden, bedeutet dies, dass Sie nur 1 Reduzierer verwenden. Würde dies nicht den Prozess verlangsamen, da nur 1 Reduzierstück verwendet wird?
user2773013
1
Ja, aber darum bitten Sie. Spark gibt eine Datei pro Partition aus. Warum interessiert Sie andererseits die Anzahl der Dateien? Wenn Sie Dateien in Spark lesen, können Sie einfach das übergeordnete Verzeichnis angeben und alle Partitionen werden als einzelne RDD gelesen
David
1
Bitte nicht coalesce(1), es sei denn, Sie wissen, was Sie tun .
Gsamaras
4

Wie bereits erwähnt, können Sie Ihren Datensatz sammeln oder zusammenführen, um Spark zu zwingen, eine einzelne Datei zu erstellen. Dies begrenzt jedoch auch die Anzahl der Spark-Aufgaben, die parallel an Ihrem Dataset ausgeführt werden können. Ich bevorzuge es, hundert Dateien im Ausgabe-HDFS-Verzeichnis erstellen zu lassen und dann hadoop fs -getmerge /hdfs/dir /local/file.txtdie Ergebnisse in eine einzelne Datei im lokalen Dateisystem zu extrahieren. Dies ist am sinnvollsten, wenn Ihre Ausgabe natürlich ein relativ kleiner Bericht ist.

Matt
quelle
2

Sie können folgendermaßen anrufen repartition()und folgen:

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)

var repartitioned = year.repartition(1)
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")

Geben Sie hier die Bildbeschreibung ein

Bhaskar Das
quelle
1

Sie können dies in der nächsten Version von Spark tun. In der aktuellen Version 1.0.0 ist dies nur möglich, wenn Sie es manuell ausführen, z. B. wie bereits erwähnt, mit einem Bash-Skriptaufruf.

gprivitera
quelle
2
Die nächste Version von Spark ist da und es ist nicht klar, wie es geht :(
Ciprian Tomoiagă
1

Ich möchte auch erwähnen, dass in der Dokumentation klar angegeben ist, dass Benutzer beim Aufrufen der Koaleszenz mit einer wirklich kleinen Anzahl von Partitionen vorsichtig sein sollten. Dies kann dazu führen, dass Upstream-Partitionen diese Anzahl von Partitionen erben.

Ich würde die Verwendung von Coalesce (1) nur empfehlen, wenn dies wirklich erforderlich ist.

JavaPlanet
quelle
1

In Spark 1.6.1 ist das Format wie unten gezeigt. Es wird eine einzelne Ausgabedatei erstellt. Es wird empfohlen, diese zu verwenden, wenn die Ausgabe klein genug ist, um verarbeitet zu werden. Grundsätzlich wird eine neue RDD zurückgegeben, die in numPartitions-Partitionen reduziert wird. Wenn Sie eine drastische Koaleszenz durchführen, zB bei numPartitions = 1 kann dies dazu führen, dass Ihre Berechnung auf weniger Knoten stattfindet, als Sie möchten (z. B. einem Knoten bei numPartitions = 1).

pair_result.coalesce(1).saveAsTextFile("/app/data/")
Aravind Krishnakumar
quelle
0

Hier ist meine Antwort, um eine einzelne Datei auszugeben. Ich habe gerade hinzugefügtcoalesce(1)

val year = sc.textFile("apat63_99.txt")
              .map(_.split(",")(1))
              .flatMap(_.split(","))
              .map((_,1))
              .reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")

Code:

year.coalesce(1).saveAsTextFile("year")
Ian Mendoza
quelle