Ich habe eine Spark-Streaming-Anwendung, die für jede Minute einen Datensatz erstellt. Ich muss die Ergebnisse der verarbeiteten Daten speichern / überschreiben.
Beim Versuch, das Dataset org.apache.hadoop.mapred.FileAlreadyExistsException zu überschreiben, wird die Ausführung gestoppt.
Ich habe die Spark-Eigenschaft festgelegt set("spark.files.overwrite","true")
, aber es gibt kein Glück.
Wie kann ich die Dateien von Spark überschreiben oder vorab löschen?
apache-spark
Vijay Innamuri
quelle
quelle
set("spark.files.overwrite","true")
funktioniert nur für Dateien, die durchspark.addFile()
Antworten:
UPDATE: Schlagen Sie vor
Dataframes
, plus etwas wie... .write.mode(SaveMode.Overwrite) ...
.Handlicher Zuhälter:
Für ältere Versionen versuchen
In 1.1.0 können Sie die Conf-Einstellungen mithilfe des Spark-Submit-Skripts mit dem Flag --conf festlegen.
WARNUNG (ältere Versionen): Laut @piggybox gibt es einen Fehler in Spark, bei dem nur Dateien überschrieben werden, die zum Schreiben der
part-
Dateien erforderlich sind. Alle anderen Dateien werden nicht entfernt.quelle
Spark 1.4
:df.write.mode(SaveMode.Overwrite).parquet(path)
df.write.mode(mode: String).parquet(path)
Where-Modus verwenden: String kann sein: "überschreiben", "anhängen", "ignorieren", "Fehler".da
df.save(path, source, mode)
ist veraltet ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )Verwenden Sie dort,
df.write.format(source).mode("overwrite").save(path)
wo df.write DataFrameWriter ist
'Quelle' kann sein ("com.databricks.spark.avro" | "Parkett" | "json")
quelle
source
kann auch seincsv
In der Dokumentation für den Parameter
spark.files.overwrite
heißt es: "Gibt an, ob Dateien überschrieben werden sollen, die hinzugefügt wurden,SparkContext.addFile()
wenn die Zieldatei vorhanden ist und deren Inhalt nicht mit dem der Quelle übereinstimmt." Es hat also keine Auswirkungen auf die saveAsTextFiles-Methode.Sie können dies tun, bevor Sie die Datei speichern:
Aas hier erklärt: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html
quelle
In der Dokumentation zu pyspark.sql.DataFrame.save (derzeit 1.3.1) können Sie
mode='overwrite'
beim Speichern eines DataFrame Folgendes angeben:Ich habe überprüft, dass dadurch sogar übrig gebliebene Partitionsdateien entfernt werden. Wenn Sie also ursprünglich 10 Partitionen / Dateien angegeben haben, dann aber den Ordner mit einem DataFrame überschrieben haben, der nur 6 Partitionen enthält, enthält der resultierende Ordner die 6 Partitionen / Dateien.
Weitere Informationen zu den Modusoptionen finden Sie in der Spark SQL-Dokumentation .
quelle
spark.hadoop.validateOutputSpecs
funktioniert über alle Spark-APIs hinweg.spark.hadoop.validateOutputSpecs
es bei 1.3 nicht funktioniert, aber das tut es.save(... , mode=
Route können Sie einen Satz von Dateien überschreiben, einen anderen anhängen usw. im selben Spark-Kontext. Würden Sie sich nichtspark.hadoop.validateOutputSpecs
auf nur einen Modus pro Kontext beschränken?df.write.mode('overwrite').parquet("/output/folder/path")
funktioniert, wenn Sie eine Parkettdatei mit Python überschreiben möchten. Dies ist in Funken 1.6.2. Die API kann in späteren Versionen unterschiedlich seinquelle
quelle
df.write.mode(SaveMode.Overwrite)
Diese überladene Version der Speicherfunktion funktioniert bei mir:
yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))
Das obige Beispiel würde einen vorhandenen Ordner überschreiben. Der Savemode kann auch diese Parameter übernehmen ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):
Anhängen : Der Anhänge-Modus bedeutet, dass beim Speichern eines DataFrame in einer Datenquelle, sofern bereits Daten / Tabellen vorhanden sind, erwartet wird, dass der Inhalt des DataFrame an vorhandene Daten angehängt wird.
ErrorIfExists : Der ErrorIfExists-Modus bedeutet, dass beim Speichern eines DataFrame in einer Datenquelle, wenn bereits Daten vorhanden sind, eine Ausnahme erwartet wird.
Ignorieren : Der Ignoriermodus bedeutet, dass beim Speichern eines Datenrahmens in einer Datenquelle, wenn bereits Daten vorhanden sind, erwartet wird, dass der Speichervorgang den Inhalt des Datenrahmens nicht speichert und die vorhandenen Daten nicht ändert.
quelle
Wenn Sie bereit sind, Ihr eigenes benutzerdefiniertes Ausgabeformat zu verwenden, können Sie das gewünschte Verhalten auch mit RDD erzielen.
Schauen Sie sich die folgenden Klassen an: FileOutputFormat , FileOutputCommitter
Im Dateiausgabeformat haben Sie eine Methode namens checkOutputSpecs, die überprüft, ob das Ausgabeverzeichnis vorhanden ist. In FileOutputCommitter haben Sie den commitJob, der normalerweise Daten aus dem temporären Verzeichnis an seinen endgültigen Speicherort überträgt.
Ich konnte es noch nicht verifizieren (würde es tun, sobald ich ein paar freie Minuten habe), aber theoretisch: Wenn ich FileOutputFormat erweitere und checkOutputSpecs auf eine Methode überschreibe, die keine Ausnahme für das bereits vorhandene Verzeichnis auslöst, und das anpassen commitJob-Methode meines benutzerdefinierten Ausgabe-Committers, um die gewünschte Logik auszuführen (z. B. einige der Dateien überschreiben, andere anhängen), um möglicherweise auch mit RDDs das gewünschte Verhalten zu erzielen.
Das Ausgabeformat wird übergeben an: saveAsNewAPIHadoopFile (dies ist auch die Methode saveAsTextFile, die aufgerufen wird, um die Dateien tatsächlich zu speichern). Der Output Committer wird auf Anwendungsebene konfiguriert.
quelle