Spark java.lang.OutOfMemoryError: Java-Heapspeicher

228

Mein Cluster: 1 Master, 11 Slaves, jeder Knoten hat 6 GB Speicher.

Meine Einstellungen:

spark.executor.memory=4g, Dspark.akka.frameSize=512

Hier ist das Problem:

Zuerst habe ich einige Daten (2,19 GB) von HDFS zu RDD gelesen:

val imageBundleRDD = sc.newAPIHadoopFile(...)

Zweitens , machen Sie etwas auf diesem RDD:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

Zuletzt Ausgabe an HDFS:

res.saveAsNewAPIHadoopFile(...)

Wenn ich mein Programm starte, wird Folgendes angezeigt:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

Es gibt zu viele Aufgaben?

PS : Alles ist in Ordnung, wenn die Eingabedaten ungefähr 225 MB betragen.

Wie kann ich dieses Problem lösen?

hequn8128
quelle
Wie läuft Funke? ist es von der Konsole? oder welche Bereitstellungsskripte verwenden Sie?
Tombart
Ich benutze sbt, um meine App zu kompilieren und auszuführen. sbt-Paket dann sbt ausführen. Ich habe vor einem Monat dasselbe Programm auf hadoop implementiert und bin auf dasselbe Problem von OutOfMemoryError gestoßen. In hadoop kann es jedoch leicht gelöst werden, indem der Wert von mapred.child.java.opts von Xmx200m auf Xmx400m erhöht wird. Hat Spark eine JVM-Einstellung für seine Aufgaben? Ich frage mich, ob spark.executor.memory dieselbe Bedeutung hat wie mapred.child.java.opts in hadoop. In meinem Programm wurde spark.executor.memory bereits auf 4 g eingestellt, das viel größer als Xmx400m in hadoop ist. Vielen Dank ~
hequn8128
Sind die drei Schritte, die Sie erwähnen, die einzigen, die Sie tun? Wie groß sind die von (data._1, desPoints) generierten Daten - dies sollte in den Speicher passen, insbesondere wenn diese Daten dann auf eine andere Stufe gemischt werden
Arnon Rotem-Gal-Oz
1
Wie ist die Speicherkonfiguration für den Treiber? Überprüfen Sie, auf welchem ​​Server der Speicherfehler angezeigt wird. Ist es der Fahrer oder einer der Vollstrecker?
RanP
Sehen Sie hier alle Konfigurationseigenschaften: spark.apache.org/docs/2.1.0/configuration.html
Naramsim

Antworten:

362

Ich habe ein paar Vorschläge:

  • Wenn Ihre Knoten so konfiguriert sind, dass sie maximal 6 g für Spark haben (und ein wenig für andere Prozesse übrig bleiben), verwenden Sie 6 g anstelle von 4 g spark.executor.memory=6g. Stellen Sie sicher, dass Sie so viel Speicher wie möglich verwenden, indem Sie die Benutzeroberfläche überprüfen (es wird angezeigt, wie viel Mem Sie verwenden).
  • Versuchen Sie, mehr Partitionen zu verwenden. Sie sollten 2 - 4 pro CPU haben. IME, das die Anzahl der Partitionen erhöht, ist oft der einfachste Weg, ein Programm stabiler (und oft schneller) zu machen. Für große Datenmengen benötigen Sie möglicherweise weit mehr als 4 pro CPU. In einigen Fällen musste ich 8000 Partitionen verwenden!
  • Verringern Sie den für das Caching reservierten Speicheranteil mithilfe von spark.storage.memoryFraction. Wenn Sie cache()oder persistin Ihrem Code nicht verwenden , kann dies genauso gut 0 sein. Der Standardwert ist 0,6, was bedeutet, dass Sie nur 0,4 * 4 g Speicher für Ihren Heap erhalten. Durch die Reduzierung des Mem Frac durch IME verschwinden OOMs häufig. UPDATE: Ab Spark 1.6 müssen wir anscheinend nicht mehr mit diesen Werten spielen, Spark ermittelt sie automatisch.
  • Ähnlich wie oben, jedoch Shuffle-Speicherfraktion . Wenn Ihr Job nicht viel Shuffle-Speicher benötigt, stellen Sie ihn auf einen niedrigeren Wert ein (dies kann dazu führen, dass Ihre Shuffles auf die Festplatte übertragen werden, was sich katastrophal auf die Geschwindigkeit auswirken kann). Manchmal, wenn es sich um eine Shuffle-Operation handelt, die OOMing ist, müssen Sie das Gegenteil tun, dh sie auf etwas Großes wie 0,8 einstellen oder sicherstellen, dass Ihre Shuffles auf die Festplatte übertragen werden (dies ist die Standardeinstellung seit 1.0.0).
  • Achten Sie auf Speicherlecks . Diese werden häufig durch versehentliches Schließen von Objekten verursacht, die Sie in Ihren Lambdas nicht benötigen. Die Art der Diagnose besteht darin, in den Protokollen nach der als XXX-Byte serialisierten Aufgabe Ausschau zu halten. Wenn XXX größer als einige k oder mehr als ein MB ist, liegt möglicherweise ein Speicherverlust vor. Siehe https://stackoverflow.com/a/25270600/1586965
  • Bezogen auf oben; Verwenden Sie Broadcast-Variablen, wenn Sie wirklich große Objekte benötigen.
  • Wenn Sie große RDDs cachen und kann einige Zugriffszeit opfern betrachten serializing die RDD http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage . Oder sie sogar auf der Festplatte zwischenzuspeichern (was manchmal nicht so schlimm ist, wenn SSDs verwendet werden).
  • ( Erweitert ) Vermeiden Stringund stark verschachtelte Strukturen (wie Mapund verschachtelte Fallklassen). Versuchen Sie nach Möglichkeit, nur primitive Typen zu verwenden und alle nicht-primitiven zu indizieren, insbesondere wenn Sie viele Duplikate erwarten. Wählen WrappedArraySie nach Möglichkeit verschachtelte Strukturen aus. Oder führen Sie sogar Ihre eigene Serialisierung ein - SIE haben die meisten Informationen darüber, wie Sie Ihre Daten effizient in Bytes sichern können. BENUTZEN SIE ES !
  • ( etwas hackig ) Erwägen Sie beim Zwischenspeichern erneut, DatasetIhre Struktur mit a zwischenzuspeichern, da dies eine effizientere Serialisierung ermöglicht. Dies sollte im Vergleich zum vorherigen Aufzählungspunkt als Hack angesehen werden. Wenn Sie Ihr Domain-Wissen in Ihr Algo / Ihre Serialisierung integrieren, können Sie den Speicher- / Cache-Speicherplatz um das 100-fache oder 1000-fache minimieren, wohingegen alles Dataset, was Sie wahrscheinlich geben, 2x - 5x im Speicher und 10x komprimiert (Parkett) auf der Festplatte ist.

http://spark.apache.org/docs/1.2.1/configuration.html

BEARBEITEN: (damit ich mich leichter googeln kann) Folgendes weist ebenfalls auf dieses Problem hin:

java.lang.OutOfMemoryError : GC overhead limit exceeded
samthebest
quelle
Vielen Dank für Ihre Vorschläge ~ Wenn ich spark.executor.memory = 6g setze, tritt bei spark das Problem auf: "Überprüfen Sie die Benutzeroberfläche Ihres Clusters, um sicherzustellen, dass die Mitarbeiter registriert sind und über ausreichend Speicher verfügen." Das Setzen von spark.storage.memoryFraction auf 0.1 kann das Problem ebenfalls nicht lösen. Vielleicht liegt das Problem in meinem Code. Danke!
hequn8128
2
@samthebest Dies ist eine fantastische Antwort. Ich schätze die Protokollierungshilfe zum Auffinden von Speicherlecks sehr.
Myles Baker
1
Hallo @samthebest, wie haben Sie 8000 Partitionen angegeben? Da ich Spark SQL verwende, kann ich Partition nur mit spark.sql.shuffle.partitions angeben. Der Standardwert ist 200, sollte ich ihn auf mehr setzen. Ich habe versucht, ihn auf 1000 zu setzen, aber ich helfe nicht, OOM zu erhalten. Wissen Sie, was das Optimum sein sollte? Partitionswert Ich habe 1 TB verzerrte Daten zu verarbeiten und es handelt sich um Gruppen-nach-Hive-Abfragen. Bitte führen.
Umesh K
2
Hallo @ user449355, könnten Sie bitte eine neue Frage stellen? Aus Angst, einen langen Kommentarthread zu beginnen :) Wenn Sie Probleme haben, sind es wahrscheinlich andere Leute, und eine Frage würde es einfacher machen, sie für alle zu finden.
Samthebest
1
Bis zu Ihrem ersten Punkt, @samthebest, sollten Sie nicht den gesamten Speicher verwenden, spark.executor.memoryda Sie definitiv etwas Speicher für den E / A-Overhead benötigen. Wenn Sie alles verwenden, wird Ihr Programm verlangsamt. Die Ausnahme hiervon könnte Unix sein. In diesem Fall haben Sie Swap Space.
Hunle
58

Um diesem häufig nicht diskutierten SparkAnwendungsfall einen Anwendungsfall hinzuzufügen, werde ich eine Lösung vorschlagen, wenn ich einen Antrag über spark-submitim lokalen Modus einreiche .

Laut dem Gitbook Mastering Apache Spark von Jacek Laskowski :

Sie können Spark im lokalen Modus ausführen. In diesem nicht verteilten Einzel-JVM-Bereitstellungsmodus erzeugt Spark alle Ausführungskomponenten - Treiber, Executor, Backend und Master - in derselben JVM. Dies ist der einzige Modus, in dem ein Treiber zur Ausführung verwendet wird.

Wenn also OOMFehler mit dem auftreten heap, reicht es aus, das driver-memoryund nicht das anzupassen executor-memory.

Hier ist ein Beispiel:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 
Brian
quelle
Wie viel Prozent sollten wir für den Treiberspeicher im Standalone-Modus berücksichtigen?
Yashwanth Kambala
@Brian, Muss der Treiberspeicher im lokalen Modus größer sein als die Eingabedatengröße? Ist es möglich, die Anzahl der Partitionen für das Eingabedatensatz anzugeben, damit der Spark-Job mit einem Datensatz umgehen kann, der viel größer als der verfügbare RAM ist?
fuyi vor
19

Sie sollten die OffHeap-Speichereinstellungen wie folgt konfigurieren:

val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()

Geben Sie den Treiberspeicher und den Executor-Speicher gemäß der RAM-Verfügbarkeit Ihres Computers an. Sie können die Größe von offHeap erhöhen, wenn das OutofMemory-Problem weiterhin besteht .

pavan.vn101
quelle
OffHeap Einstellung hinzugefügt hat geholfen
kennyut
2
Das Einstellen des Treiberspeichers in Ihrem Code funktioniert nicht. Lesen Sie dazu die Spark-Dokumentation: Die Spark-Eigenschaften können hauptsächlich in zwei Arten unterteilt werden: Eine bezieht sich auf die Bereitstellung, z. B. "spark.driver.memory", "spark.executor.instances", Diese Art von Eigenschaften wird möglicherweise nicht beeinflusst, wenn sie zur Laufzeit programmgesteuert über SparkConf festgelegt werden. Das Verhalten hängt davon ab, welchen Cluster-Manager und Bereitstellungsmodus Sie auswählen. Es wird daher empfohlen, diese über die Konfigurationsdatei oder die Befehlszeilenoptionen für das Senden von Funken festzulegen.
Abdulhafeth Sartawi
1
DIE BESTE ANTWORT! Mein Problem war, dass Spark nicht am Masterknoten installiert war. Ich habe nur PySpark verwendet, um eine Verbindung zu HDFS herzustellen, und habe den gleichen Fehler erhalten. Mit hat configdas Problem gelöst.
Mikhail_Sam
Ich habe gerade die Konfigurationen mit dem Befehl spark-submit hinzugefügt, um das Problem mit der Heap-Größe zu beheben. Vielen Dank.
Pritam Sadhukhan
16

Sie sollten den Treiberspeicher erhöhen. In Ihrem Ordner $ SPARK_HOME / conf sollten Sie die Datei finden spark-defaults.conf, bearbeiten und einstellen, spark.driver.memory 4000mabhängig vom Speicher Ihres Masters, denke ich. Dies hat das Problem für mich behoben und alles läuft reibungslos

Blueskin
quelle
Wie viel Prozent des Mem zugeteilt werden sollen, im Alleingang
Yashwanth Kambala
14

Schauen Sie sich die Startskripte an, für die dort eine Java-Heap-Größe festgelegt ist. Es sieht so aus, als würden Sie dies nicht festlegen, bevor Sie den Spark-Worker ausführen.

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

Die Dokumentation zum Bereitstellen von Skripten finden Sie hier .

Tombart
quelle
Danke ~ Ich werde es später versuchen. Aus der Spark-Benutzeroberfläche geht hervor, dass der Speicher jedes Executors 4096 ist. Die Einstellung wurde also aktiviert, oder?
hequn8128
Ich habe Ihre Antwort gesehen, während ich vor einem ähnlichen Problem stehe ( stackoverflow.com/questions/34762432/… ). Wenn Sie den von Ihnen angegebenen Link betrachten, sieht es so aus, als ob die Einstellung von Xms / Xmx nicht mehr vorhanden ist. Können Sie sagen, warum?
Seffy
Der Inhalt des von verlinkten Skripts start up scriptshat sich leider geändert. Vom 19.12.2019
David Groomes
7

Ich habe sehr unter diesem Problem gelitten, wir verwenden die dynamische Ressourcenzuweisung und ich dachte, dass meine Clusterressourcen verwendet werden, um am besten zur Anwendung zu passen.

Die Wahrheit ist jedoch, dass die dynamische Ressourcenzuweisung den Treiberspeicher nicht festlegt und ihn auf seinem Standardwert von 1 g hält.

Ich habe es behoben, indem ich spark.driver.memory auf eine Zahl gesetzt habe, die zum Speicher meines Treibers passt (für 32 GB RAM habe ich es auf 18 GB eingestellt).

Sie können es mit dem Befehl spark submit wie folgt einstellen:

spark-submit --conf spark.driver.memory=18gb ....cont

Sehr wichtiger Hinweis: Diese Eigenschaft wird nicht berücksichtigt, wenn Sie sie gemäß der Spark-Dokumentation aus dem Code festlegen:

Spark-Eigenschaften können hauptsächlich in zwei Arten unterteilt werden: Eine bezieht sich auf die Bereitstellung, z. B. "spark.driver.memory", "spark.executor.instances". Diese Art von Eigenschaften wird möglicherweise nicht beeinflusst, wenn sie zur Laufzeit programmgesteuert über SparkConf festgelegt werden Das Verhalten hängt davon ab, welchen Cluster-Manager und Bereitstellungsmodus Sie auswählen. Es wird daher empfohlen, die Befehlszeilenoptionen über die Konfigurationsdatei oder die Spark-Submit-Option festzulegen. Eine andere bezieht sich hauptsächlich auf die Spark-Laufzeitsteuerung, wie z. B. "spark.task.maxFailures". Diese Art von Eigenschaften kann auf beide Arten festgelegt werden.

Abdulhafeth Sartawi
quelle
2
Sie sollten verwenden --conf spark.driver.memory = 18g
Merenptah
5

Im Allgemeinen kann der Spark Executor JVM-Speicher in zwei Teile unterteilt werden. Funken- und Benutzerspeicher. Dies wird durch die Eigenschaft gesteuert spark.memory.fraction- der Wert liegt zwischen 0 und 1. Wenn Sie mit Bildern arbeiten oder eine speicherintensive Verarbeitung in Funkenanwendungen durchführen, sollten Sie den Wert verringernspark.memory.fraction . Dadurch wird mehr Speicher für Ihre Anwendungsarbeit verfügbar. Spark kann auslaufen, sodass es immer noch mit weniger Speicherfreigabe funktioniert.

Der zweite Teil des Problems ist die Arbeitsteilung. Wenn möglich, partitionieren Sie Ihre Daten in kleinere Blöcke. Kleinere Daten benötigen möglicherweise weniger Speicher. Wenn dies jedoch nicht möglich ist, opfern Sie die Berechnung für das Gedächtnis. In der Regel führt ein einzelner Executor mehrere Kerne aus. Der Gesamtspeicher der Ausführenden muss ausreichen, um den Speicherbedarf aller gleichzeitigen Aufgaben zu decken. Wenn das Erhöhen des Executor-Speichers nicht möglich ist, können Sie die Kerne pro Executor verringern, damit jede Aufgabe mehr Arbeitsspeicher erhält. Testen Sie mit 1 Core-Executoren, die über den größtmöglichen Speicher verfügen, den Sie geben können, und erhöhen Sie dann die Anzahl der Kerne, bis Sie die beste Core-Anzahl gefunden haben.

Rohit Karlupia
quelle
5

Haben Sie Ihr Master-GC-Protokoll gelöscht? Also bin ich auf ein ähnliches Problem gestoßen und habe festgestellt, dass SPARK_DRIVER_MEMORY nur den Xmx-Heap gesetzt hat. Die anfängliche Heap-Größe bleibt 1 G und die Heap-Größe wird niemals auf den Xmx-Heap skaliert.

Das Übergeben von "--conf" spark.driver.extraJavaOptions = -Xms20g "behebt mein Problem.

ps aux | grep java und du siehst das folgende log: =

24501 30,7 1,7 41782944 2318184 pts / 0 Sl + 18:49 0:33 / usr / java / latest / bin / java -cp / opt / spark / conf /: / opt / spark / jars / * -Xmx30g -Xms20g

Yunzhao Yang
quelle
3

Der Speicherort für die Größe des Speicherheaps (mindestens in spark-1.0.0) ist conf / spark-env. Die relevanten Variablen sind SPARK_EXECUTOR_MEMORY& SPARK_DRIVER_MEMORY. Weitere Dokumente finden Sie im Bereitstellungshandbuch

Vergessen Sie auch nicht, die Konfigurationsdatei auf alle Slave-Knoten zu kopieren.

Amnon
quelle
4
Woher weißt du, welches zwischen SPARK_EXECUTOR_MEMORY& eingestellt werden soll SPARK_DRIVER_MEMORY?
Hunle
13
dh welcher Fehler würde Ihnen sagen, dass Sie die erhöhen sollen SPARK_EXECUTOR_MEMORY, und welcher Fehler würde Ihnen sagen, dass Sie die erhöhen sollen SPARK_DRIVER_MEMORY?
Hunle
2

Ich habe nur wenige Vorschläge für den oben genannten Fehler.

● Überprüfen Sie, ob der als Executor zugewiesene Executor-Speicher möglicherweise mit Partitionen umgehen muss, die mehr Speicher benötigen als zugewiesen.

● Versuchen Sie festzustellen, ob mehr Shuffles aktiv sind, da Shuffles teure Vorgänge sind, da sie Festplatten-E / A, Datenserialisierung und Netzwerk-E / A umfassen

● Verwenden Sie Broadcast-Joins

● Vermeiden Sie die Verwendung von groupByKeys und versuchen Sie, diese durch ReduceByKey zu ersetzen

● Vermeiden Sie die Verwendung großer Java-Objekte überall dort, wo gemischt wird

Unmesha SreeVeni
quelle
Es tut mir leid, die Abfrage eines anderen zu entführen, aber wie wird reductByKey über groupBy verwendet?
Somil Aseeja
1

Nach meinem Verständnis des oben bereitgestellten Codes wird die Datei geladen, der Kartenvorgang ausgeführt und zurückgespeichert. Es gibt keine Operation, die ein Mischen erfordert. Es gibt auch keine Operation, bei der Daten zum Treiber gebracht werden müssen, sodass das Optimieren von Änderungen im Zusammenhang mit dem Mischen oder dem Treiber möglicherweise keine Auswirkungen hat. Der Treiber hat Probleme, wenn es zu viele Aufgaben gibt, aber dies war nur bis zur Version Spark 2.0.2. Es kann zwei Dinge geben, die schief gehen.

  • Es gibt nur einen oder wenige Testamentsvollstrecker. Erhöhen Sie die Anzahl der Executoren, damit sie verschiedenen Slaves zugewiesen werden können. Wenn Sie Garn verwenden, müssen Sie die Konfiguration der Num-Executors ändern, oder wenn Sie Spark Standalone verwenden, müssen Sie Num-Cores pro Executor und Spark Max Cores Conf einstellen. In Standalone-Anzahl Executoren = max Kerne / Kerne pro Executor.
  • Die Anzahl der Partitionen ist sehr gering oder nur eine. Wenn dies niedrig ist, selbst wenn wir mehrere Kerne und mehrere Executoren haben, ist dies keine große Hilfe, da die Parallelisierung von der Anzahl der Partitionen abhängt. Erhöhen Sie also die Partitionen, indem Sie imageBundleRDD.repartition (11) ausführen.
Shridhar
quelle
0

Durch Festlegen dieser genauen Konfigurationen konnte das Problem behoben werden.

spark-submit --conf spark.yarn.maxAppAttempts=2 --executor-memory 10g --num-executors 50 --driver-memory 12g
Swapnil Shashank
quelle