Wie kann ich die Nummer, die Kerne und den Executor-Speicher des Funken-Executors einstellen?

84

Wo fangen Sie an, die oben genannten Parameter abzustimmen? Beginnen wir mit dem Executor-Speicher und erhalten die Anzahl der Executoren oder beginnen wir mit Kernen und erhalten die Executor-Nummer. Ich bin dem Link gefolgt . Ich habe jedoch eine hochrangige Idee, bin mir aber immer noch nicht sicher, wie oder wo ich anfangen soll, um zu einem endgültigen Ergebnis zu gelangen.

Ramzy
quelle

Antworten:

206

Die folgende Antwort behandelt die drei im Titel genannten Hauptaspekte - Anzahl der Ausführenden, Ausführerspeicher und Anzahl der Kerne. Möglicherweise gibt es andere Parameter wie den Treiberspeicher und andere, die ich zum Zeitpunkt dieser Antwort nicht angesprochen habe, die ich aber in naher Zukunft hinzufügen möchte.

Fall 1 Hardware - 6 Knoten und jeder Knoten 16 Kerne, 64 GB RAM

Jeder Executor ist eine JVM-Instanz. Wir können also mehrere Executoren in einem einzigen Knoten haben

Für Betriebssystem- und Hadoop-Daemons werden zunächst 1 Kern und 1 GB benötigt. Es stehen also 15 Kerne und 63 GB RAM für jeden Knoten zur Verfügung

Beginnen Sie mit der Auswahl der Anzahl der Kerne :

Number of cores = Concurrent tasks as executor can run 

So we might think, more concurrent tasks for each executor will give better performance. But research shows that
any application with more than 5 concurrent tasks, would lead to bad show. So stick this to 5.

This number came from the ability of executor and not from how many cores a system has. So the number 5 stays same
even if you have double(32) cores in the CPU.

Anzahl der Testamentsvollstrecker:

Coming back to next step, with 5 as cores per executor, and 15 as total available cores in one Node(CPU) - we come to 
3 executors per node.

So with 6 nodes, and 3 executors per node - we get 18 executors. Out of 18 we need 1 executor (java process) for AM in YARN we get 17 executors

This 17 is the number we give to spark using --num-executors while running from spark-submit shell command

Speicher für jeden Executor:

From above step, we have 3 executors  per node. And available RAM is 63 GB

So memory for each executor is 63/3 = 21GB. 

However small overhead memory is also needed to determine the full memory request to YARN for each executor.
Formula for that over head is max(384, .07 * spark.executor.memory)

Calculating that overhead - .07 * 21 (Here 21 is calculated as above 63/3)
                            = 1.47

Since 1.47 GB > 384 MB, the over head is 1.47.
Take the above from each 21 above => 21 - 1.47 ~ 19 GB

So executor memory - 19 GB

Endgültige Zahlen - Executors - 17, Cores 5, Executor Memory - 19 GB


Fall 2 Hardware: Gleicher 6 Knoten, 32 Kerne, 64 GB

5 ist das gleiche für eine gute Parallelität

Anzahl der Ausführenden für jeden Knoten = 32/5 ~ 6

Also Gesamtausführende = 6 * 6 Knoten = 36. Dann ist die endgültige Zahl 36 - 1 für AM = 35

Der Executor-Speicher beträgt: 6 Executor für jeden Knoten. 63/6 ~ 10. Der Overhead beträgt 0,07 * 10 = 700 MB. Wenn wir also über Kopf auf 1 GB runden, erhalten wir 10-1 = 9 GB

Endgültige Nummern - Executors - 35, Cores 5, Executor Memory - 9 GB


Fall 3

Die obigen Szenarien beginnen damit, die Anzahl der Kerne als fest zu akzeptieren und auf die Anzahl der Ausführenden und den Speicher zu verschieben.

Wenn wir für den ersten Fall glauben, dass wir keine 19 GB benötigen und nur 10 GB ausreichen, dann sind die folgenden Zahlen:

Kerne 5 Anzahl der Ausführenden für jeden Knoten = 3

Zu diesem Zeitpunkt würde dies nach unserer ersten Berechnung zu 21 und dann zu 19 führen. Aber da wir dachten, 10 sei in Ordnung (nehmen Sie wenig Overhead an), können wir die Anzahl der Executoren pro Knoten nicht auf 6 setzen (wie 63/10). Mit 6 Executoren pro Knoten und 5 Kernen sind es 30 Kerne pro Knoten, wenn wir nur 16 Kerne haben. Daher müssen wir auch die Anzahl der Kerne für jeden Executor ändern.

Also nochmal rechnen,

Die magische Zahl 5 ergibt 3 (jede Zahl kleiner oder gleich 5). Mit 3 Kernen und 15 verfügbaren Kernen erhalten wir also 5 Executoren pro Knoten. Also (5 * 6 -1) = 29 Testamentsvollstrecker

Der Speicher ist also 63/5 ~ 12. Der Overhead beträgt 12 * .07 = .84 Der Executor-Speicher beträgt also 12 - 1 GB = 11 GB

Die endgültigen Zahlen sind 29 Executoren, 3 Kerne, der Executor-Speicher beträgt 11 GB


Dynamische Zuordnung:

Hinweis: Obergrenze für die Anzahl der Ausführenden, wenn die dynamische Zuordnung aktiviert ist. Dies besagt also, dass die Funkenanwendung bei Bedarf alle Ressourcen verschlingen kann. Stellen Sie daher in einem Cluster, in dem andere Anwendungen ausgeführt werden und für deren Ausführung auch Kerne erforderlich sind, sicher, dass Sie dies auf Clusterebene tun. Ich meine, Sie können YARN basierend auf dem Benutzerzugriff eine bestimmte Anzahl von Kernen zuweisen. Sie können also spark_user erstellen und dann Kerne (min / max) für diesen Benutzer angeben. Diese Grenzwerte gelten für die gemeinsame Nutzung zwischen Spark- und anderen Anwendungen, die auf YARN ausgeführt werden.

spark.dynamicAllocation.enabled - Wenn dies auf true gesetzt ist - Wir müssen keine Executoren erwähnen. Der Grund ist unten:

Die statische Parameternummer, die wir bei spark-submit angeben, gilt für die gesamte Auftragsdauer. Wenn jedoch eine dynamische Zuordnung ins Bild kommt, gibt es verschiedene Phasen wie

Was soll ich anfangen:

Anfangsanzahl der Executoren ( spark.dynamicAllocation.initialExecutors )

Wie viele :

Dann basierend auf der Last (ausstehende Aufgaben), wie viele angefordert werden sollen. Dies wären schließlich die Zahlen, die wir bei spark-submit statisch angeben. Sobald die anfänglichen Executor-Nummern festgelegt sind, gehen wir zu den Nummern min ( spark.dynamicAllocation.minExecutors ) und max ( spark.dynamicAllocation.maxExecutors ).

Wann fragen oder geben:

Wann fordern wir neue Executoren an ( spark.dynamicAllocation.schedulerBacklogTimeout ) - Für diese Zeitspanne wurden ausstehende Aufgaben ausgeführt. also bitte anfordern. Die Anzahl der in jeder Runde angeforderten Testamentsvollstrecker steigt gegenüber der vorherigen Runde exponentiell an. Beispielsweise fügt eine Anwendung in der ersten Runde 1 Executor und in den folgenden Runden 2, 4, 8 usw. hinzu. An einem bestimmten Punkt kommt das obige Maximum ins Spiel

Wann verschenken wir einen Executor ( spark.dynamicAllocation.executorIdleTimeout ) -

Bitte korrigieren Sie mich, wenn ich etwas verpasst habe. Das Obige basiert auf dem Blog, das ich in Frage gestellt habe, und einigen Online-Ressourcen. Vielen Dank.

Verweise:

Ramzy
quelle
2
Ich habe irgendwo gelesen, dass es im Standalone-Modus nur einen Executor pro Knoten gibt. Ich sehe es nicht in Ihrer Antwort.
Jangorecki
2
In einem eigenständigen Cluster erhalten wir standardmäßig einen Executor pro Worker. Wir müssen mit spark.executor.cores spielen und ein Arbeiter hat genug Kerne, um mehr als einen Executor zu bekommen.
Ramzy
Ich habe festgestellt, dass mein Mitarbeiter alle 32 Kerne ohne Einrichtung verwendet spark.executor.cores, sodass möglicherweise alle standardmäßig verfügbaren Kerne verwendet werden.
Jangorecki
Ja, die Standardeinstellung für Kerne ist unendlich, wie sie sagen. Spark kann also alle verfügbaren Kerne verwenden, sofern Sie dies nicht angeben.
Ramzy
2
@Ramzy Ich denke, es sollte beachtet werden, dass Sie auch bei dynamischer Zuweisung spark.executor.cores angeben sollten, um die Größe jedes Executors zu bestimmen, den Spark zuweisen wird. Wenn Spark Ihrer Anwendung einen neuen Executor zuweist, wird andernfalls ein ganzer Knoten (falls verfügbar) zugewiesen, auch wenn Sie nur fünf weitere Kerne benötigen.
Dan Markhasin
6

Es hängt auch von Ihrem Anwendungsfall ab. Ein wichtiger Konfigurationsparameter ist:

spark.memory.fraction(Bruchteil von (Heap-Speicherplatz - 300 MB), der für die Ausführung und Speicherung verwendet wird) von http://spark.apache.org/docs/latest/configuration.html#memory-management .

Wenn Sie Cache / Persist nicht verwenden, setzen Sie es auf 0.1, damit Sie den gesamten Speicher für Ihr Programm haben.

Wenn Sie Cache / Persist verwenden, können Sie den Speicher überprüfen, der von:

sc.getExecutorMemoryStatus.map(a => (a._2._1 - a._2._2)/(1024.0*1024*1024)).sum

Lesen Sie Daten von HDFS oder von HTTP?

Auch hier hängt eine Abstimmung von Ihrem Anwendungsfall ab.

Thomas Decaux
quelle
Wissen Sie, wie der Befehl map bei Verwendung von pyspark aussehen würde? Ich benutze sc._jsc.sc (). GetExecutorMemoryStatus (), um den Executor-Status zu erhalten, kann aber nichts mit dem tun, was er zurückgibt ...
Thomas
1
Sorry @Thomas Decaux , aber meintest du Einstellung spark.memory.storageFraction=0.1? Da AFAIK spark.memory.fractionentscheidet, wie viel Speicher sowohl Sparkfür die Ausführung als auch für die Speicherung verwendet wird (das haben Sie bereits erwähnt) und nur spark.memory.storageFractionder Speicher (der für die Speicherung und Ausführung verfügbar ist), ist er gegen Cache-Räumung immun . Bitte sehen Sie diesen Link
y2k-shubham
@Thomas Wenn ich bei meiner Bewerbung nur persist (StorageLevel.DISK_ONLY) habe, gilt diese Option auch, oder? Es wirkt sich nur auf den Speicheranteil aus, aber nicht auf das Verschütten von Festplatten.
jk1