Ich führe einen Spark-Job mit in einem Spekulationsmodus aus. Ich habe ungefähr 500 Aufgaben und ungefähr 500 Dateien mit 1 GB gz komprimiert. Ich bekomme in jedem Job für 1-2 Aufgaben den angehängten Fehler, bei dem er dutzende Male wiederholt wird (wodurch verhindert wird, dass der Job abgeschlossen wird).
org.apache.spark.shuffle.MetadataFetchFailedException: Fehlender Ausgabeort für Shuffle 0
Irgendeine Idee, was das Problem bedeutet und wie man es überwinden kann?
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
apache-spark
Dotan
quelle
quelle
LostExecutor
INFO-Nachrichten gesehen? Können Sie die Executors-Seite der Web-Benutzeroberfläche überprüfen und sehen, wie sich Executors verhalten, insb. GC-weise?Antworten:
Dies ist mir passiert, als ich dem Worker-Knoten mehr Speicher gegeben habe als er hat. Da es keinen Austausch gab, stürzte der Funke ab, als versucht wurde, Objekte zum Mischen zu speichern, ohne dass mehr Speicher übrig war.
Die Lösung bestand darin, entweder Swap hinzuzufügen oder den Worker / Executor so zu konfigurieren, dass zusätzlich zur Verwendung der Speicherebene MEMORY_AND_DISK für mehrere Persists weniger Speicher verwendet wird.
quelle
Wir hatten einen ähnlichen Fehler mit Spark, aber ich bin nicht sicher, ob er mit Ihrem Problem zusammenhängt.
Wir haben
JavaPairRDD.repartitionAndSortWithinPartitions
100 GB Daten verwendet und es ist ähnlich wie bei Ihrer App immer wieder fehlgeschlagen. Dann haben wir uns die Garnprotokolle auf den spezifischen Knoten angesehen und festgestellt, dass wir ein Problem mit nicht genügend Speicher haben, sodass das Garn die Ausführung unterbrochen hat. Unsere Lösung war zu ändern /spark.shuffle.memoryFraction 0
in.../spark/conf/spark-defaults.conf
. Dadurch konnten wir auf diese Weise eine viel größere (aber leider nicht unendliche) Datenmenge verarbeiten.quelle
Ich habe das gleiche Problem auf meinem 3-Computer-YARN-Cluster. Ich habe den Arbeitsspeicher ständig gewechselt, aber das Problem blieb bestehen. Schließlich sah ich die folgenden Meldungen in den Protokollen:
und danach gab es diese Nachricht:
Ich habe die Eigenschaften in spark-defaults.conf wie folgt geändert:
Das ist es! Mein Job wurde danach erfolgreich abgeschlossen.
quelle
spark.executor.heartbeatInterval should be significantly less than spark.network.timeout
. Daher ist es möglicherweise nicht die beste Idee, beide auf denselben Wert zu setzen.Für mich machte ich ein Fenster mit großen Datenmengen (ungefähr 50B Zeilen) und bekam eine Bootsladung von
In meinen Protokollen. Offensichtlich kann 4096 bei einer solchen Datengröße klein sein ... dies führte mich zu folgendem JIRA:
https://issues.apache.org/jira/browse/SPARK-21595
Und letztendlich zu den folgenden zwei Konfigurationsoptionen:
spark.sql.windowExec.buffer.spill.threshold
spark.sql.windowExec.buffer.in.memory.threshold
Beide sind standardmäßig 4096; Ich habe sie viel höher angehoben (2097152) und die Dinge scheinen jetzt gut zu laufen. Ich bin mir nicht 100% sicher, ob dies das gleiche ist wie das hier angesprochene Problem, aber es ist eine andere Sache, die Sie versuchen sollten.
quelle
Ich habe diesen Fehler behoben und den zugewiesenen Speicher in executorMemory und driverMemory erhöht. Sie können dies in HUE tun, indem Sie das Spark-Programm auswählen, das das Problem verursacht, und in den Eigenschaften -> Optionsliste können Sie Folgendes hinzufügen:
Natürlich variieren die Werte der Parameter abhängig von der Größe Ihres Clusters und Ihren Anforderungen.
quelle
Wenn in der Spark-Web-Benutzeroberfläche Informationen wie vorhanden sind
Executors lost
, müssen Sie das Garnprotokoll überprüfen und sicherstellen, dass Ihr Container getötet wurde.Wenn der Container getötet wurde, liegt dies wahrscheinlich an dem Mangel an Speicher.
Wie finde ich die wichtigsten Informationen in Garnprotokollen? Zum Beispiel könnte es einige Warnungen wie diese geben:
In diesem Fall sollten Sie erhöhen
spark.yarn.executor.memoryOverhead
.quelle
In meinem Fall (Standalone-Cluster) wurde die Ausnahme ausgelöst, da das Dateisystem einiger Spark-Slaves zu 100% gefüllt war. Das Löschen aller Elemente in den
spark/work
Ordnern der Slaves löste das Problem.quelle
Ich habe das gleiche Problem, aber ich habe viele Antworten gesucht, die mein Problem nicht lösen können. Schließlich debugge ich meinen Code Schritt für Schritt. Ich finde, dass das Problem, das durch die Datengröße verursacht wird, nicht für jede Partition ausgeglichen ist, was dazu führt,
MetadataFetchFailedException
dass inmap
Phase nichtreduce
Phase. einfachdf_rdd.repartition(nums)
vorher machenreduceByKey()
quelle