So überschreiben Sie das Ausgabeverzeichnis in spark

106

Ich habe eine Spark-Streaming-Anwendung, die für jede Minute einen Datensatz erstellt. Ich muss die Ergebnisse der verarbeiteten Daten speichern / überschreiben.

Beim Versuch, das Dataset org.apache.hadoop.mapred.FileAlreadyExistsException zu überschreiben, wird die Ausführung gestoppt.

Ich habe die Spark-Eigenschaft festgelegt set("spark.files.overwrite","true"), aber es gibt kein Glück.

Wie kann ich die Dateien von Spark überschreiben oder vorab löschen?

Vijay Innamuri
quelle
1
Ja, es ist scheiße, nicht wahr? Ich halte es für eine Regression auf 0.9.0. Bitte akzeptieren Sie meine Antwort :)
Samthebest
set("spark.files.overwrite","true")funktioniert nur für Dateien, die durchspark.addFile()
aiman

Antworten:

106

UPDATE: Schlagen Sie vor Dataframes, plus etwas wie ... .write.mode(SaveMode.Overwrite) ....

Handlicher Zuhälter:

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._
      rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
    }
  }

Für ältere Versionen versuchen

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

In 1.1.0 können Sie die Conf-Einstellungen mithilfe des Spark-Submit-Skripts mit dem Flag --conf festlegen.

WARNUNG (ältere Versionen): Laut @piggybox gibt es einen Fehler in Spark, bei dem nur Dateien überschrieben werden, die zum Schreiben der part-Dateien erforderlich sind. Alle anderen Dateien werden nicht entfernt.

samthebest
quelle
28
Für Spark 1.4:df.write.mode(SaveMode.Overwrite).parquet(path)
Ha Pham
Für Spark SQL haben Sie Optionen zum Definieren des SaveMode für Core Spark, für den Sie so etwas nicht haben. Möchte wirklich einige dieser Funktionen für saveAsTextFile und andere Transformationen
Murtaza Kanchwala
3
Ein verstecktes Problem: Im Vergleich zu @ pzecevics Lösung zum Löschen des gesamten Ordners über HDFS überschreibt Spark bei diesem Ansatz nur die Teiledateien mit demselben Dateinamen im Ausgabeordner. Dies funktioniert meistens, aber wenn sich im Ordner noch etwas anderes befindet, z. B. zusätzliche Teiledateien von einem anderen Spark / Hadoop-Job, werden diese Dateien nicht überschrieben.
Piggybox
6
Sie können auch den df.write.mode(mode: String).parquet(path)Where-Modus verwenden: String kann sein: "überschreiben", "anhängen", "ignorieren", "Fehler".
Roggen
1
@avocado Yup denke so, die Spark-APIs werden mit jeder Veröffentlichung immer schlechter: P
samthebest
27

In der Dokumentation für den Parameter spark.files.overwriteheißt es: "Gibt an, ob Dateien überschrieben werden sollen, die hinzugefügt wurden, SparkContext.addFile()wenn die Zieldatei vorhanden ist und deren Inhalt nicht mit dem der Quelle übereinstimmt." Es hat also keine Auswirkungen auf die saveAsTextFiles-Methode.

Sie können dies tun, bevor Sie die Datei speichern:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Aas hier erklärt: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html

pzecevic
quelle
29
was ist mit pyspark?
Javadba
Die nächste Antwort auf 'write.mode (SaveMode.Overwrite)' ist der
richtige
hdfs löscht möglicherweise die neuen Dateien, sobald sie eingehen, da die alten noch gelöscht werden.
Jake
24

In der Dokumentation zu pyspark.sql.DataFrame.save (derzeit 1.3.1) können Sie mode='overwrite'beim Speichern eines DataFrame Folgendes angeben:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

Ich habe überprüft, dass dadurch sogar übrig gebliebene Partitionsdateien entfernt werden. Wenn Sie also ursprünglich 10 Partitionen / Dateien angegeben haben, dann aber den Ordner mit einem DataFrame überschrieben haben, der nur 6 Partitionen enthält, enthält der resultierende Ordner die 6 Partitionen / Dateien.

Weitere Informationen zu den Modusoptionen finden Sie in der Spark SQL-Dokumentation .

dnlbrky
quelle
2
Richtig und hilfreich, danke, aber eine DataFrame-spezifische Lösung - spark.hadoop.validateOutputSpecsfunktioniert über alle Spark-APIs hinweg.
Samthebest
Aus irgendeinem Grund hat spark.hadoop.validateOutputSpecses bei 1.3 nicht funktioniert, aber das tut es.
Eric Walker
1
@samthebest Mit der save(... , mode=Route können Sie einen Satz von Dateien überschreiben, einen anderen anhängen usw. im selben Spark-Kontext. Würden Sie sich nicht spark.hadoop.validateOutputSpecsauf nur einen Modus pro Kontext beschränken?
dnlbrky
1
@dnlbrky Das OP hat nicht um Anhängen gebeten. Wie gesagt, wahr, nützlich, aber unnötig. Wenn das OP gefragt würde, wie ich anhängen soll, könnte eine ganze Reihe von Antworten gegeben werden. Aber lassen Sie uns nicht darauf eingehen. Außerdem rate ich Ihnen, die Verwendung der Scala-Version von DataFrames in Betracht zu ziehen, da diese über Typensicherheit und mehr Überprüfung verfügt. Wenn Sie beispielsweise einen Tippfehler beim "Überschreiben" hatten, würden Sie dies erst herausfinden, wenn diese DAG ausgewertet wurde. Dies könnte in einem Big-Data-Job der Fall sein 2 Stunden später sein !! Wenn Sie die Scala-Version verwenden, überprüft der Compiler alles im Voraus! Ziemlich cool und sehr wichtig für Big Data.
Samthebest
13

df.write.mode('overwrite').parquet("/output/folder/path")funktioniert, wenn Sie eine Parkettdatei mit Python überschreiben möchten. Dies ist in Funken 1.6.2. Die API kann in späteren Versionen unterschiedlich sein

akn
quelle
4
  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)
Vaquar Khan
quelle
Nur für Spark 1, in der neuesten Version verwendendf.write.mode(SaveMode.Overwrite)
ChikuMiku
2

Diese überladene Version der Speicherfunktion funktioniert bei mir:

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))

Das obige Beispiel würde einen vorhandenen Ordner überschreiben. Der Savemode kann auch diese Parameter übernehmen ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Anhängen : Der Anhänge-Modus bedeutet, dass beim Speichern eines DataFrame in einer Datenquelle, sofern bereits Daten / Tabellen vorhanden sind, erwartet wird, dass der Inhalt des DataFrame an vorhandene Daten angehängt wird.

ErrorIfExists : Der ErrorIfExists-Modus bedeutet, dass beim Speichern eines DataFrame in einer Datenquelle, wenn bereits Daten vorhanden sind, eine Ausnahme erwartet wird.

Ignorieren : Der Ignoriermodus bedeutet, dass beim Speichern eines Datenrahmens in einer Datenquelle, wenn bereits Daten vorhanden sind, erwartet wird, dass der Speichervorgang den Inhalt des Datenrahmens nicht speichert und die vorhandenen Daten nicht ändert.

Shay
quelle
1

Wenn Sie bereit sind, Ihr eigenes benutzerdefiniertes Ausgabeformat zu verwenden, können Sie das gewünschte Verhalten auch mit RDD erzielen.

Schauen Sie sich die folgenden Klassen an: FileOutputFormat , FileOutputCommitter

Im Dateiausgabeformat haben Sie eine Methode namens checkOutputSpecs, die überprüft, ob das Ausgabeverzeichnis vorhanden ist. In FileOutputCommitter haben Sie den commitJob, der normalerweise Daten aus dem temporären Verzeichnis an seinen endgültigen Speicherort überträgt.

Ich konnte es noch nicht verifizieren (würde es tun, sobald ich ein paar freie Minuten habe), aber theoretisch: Wenn ich FileOutputFormat erweitere und checkOutputSpecs auf eine Methode überschreibe, die keine Ausnahme für das bereits vorhandene Verzeichnis auslöst, und das anpassen commitJob-Methode meines benutzerdefinierten Ausgabe-Committers, um die gewünschte Logik auszuführen (z. B. einige der Dateien überschreiben, andere anhängen), um möglicherweise auch mit RDDs das gewünschte Verhalten zu erzielen.

Das Ausgabeformat wird übergeben an: saveAsNewAPIHadoopFile (dies ist auch die Methode saveAsTextFile, die aufgerufen wird, um die Dateien tatsächlich zu speichern). Der Output Committer wird auf Anwendungsebene konfiguriert.

Michael Kopaniov
quelle
Ich würde es vermeiden, FileOutputCommitter in die Unterklasse zu bringen, wenn Sie helfen können: Das ist ein beängstigendes Stück Code. Hadoop 3.0 fügt einen Plugin-Punkt hinzu, an dem FileOutputFormat verschiedene Implementierungen einer überarbeiteten Superklasse (PathOutputCommitter) ausführen kann. Der S3 von Netflix wird
direkt