Ich versuche, die Beziehung zwischen der Anzahl der Kerne und der Anzahl der Ausführenden zu verstehen, wenn ein Spark-Job auf YARN ausgeführt wird.
Die Testumgebung ist wie folgt:
- Anzahl der Datenknoten: 3
- Datenknoten-Maschinenspezifikation:
- CPU: Core i7-4790 (Anzahl der Kerne: 4, Anzahl der Threads: 8)
- RAM: 32 GB (8 GB x 4)
- Festplatte: 8 TB (2 TB x 4)
Netzwerk: 1 GB
Spark-Version: 1.0.0
Hadoop-Version: 2.4.0 (Hortonworks HDP 2.1)
Spark-Jobfluss: sc.textFile -> Filter -> Map -> Filter -> MapToPair -> ReduceByKey -> Map -> SaveAsTextFile
Eingabedaten
- Typ: einzelne Textdatei
- Größe: 165 GB
- Anzahl der Zeilen: 454.568.833
Ausgabe
- Anzahl der Zeilen nach dem zweiten Filter: 310.640.717
- Anzahl der Zeilen der Ergebnisdatei: 99.848.268
- Größe der Ergebnisdatei: 41 GB
Der Job wurde mit folgenden Konfigurationen ausgeführt:
--master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3
(Ausführende pro Datenknoten verwenden so viel wie Kerne)--master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3
(Anzahl der Kerne reduziert)--master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12
(weniger Kern, mehr Vollstrecker)
Verstrichene Zeiten:
50 min 15 sek
55 min 48 sek
31 min 23 sek
Zu meiner Überraschung war (3) viel schneller.
Ich dachte, dass (1) schneller sein würde, da es beim Mischen weniger Kommunikation zwischen Ausführenden geben würde.
Obwohl die Anzahl der Kerne von (1) kleiner als (3) ist, ist die Anzahl der Kerne nicht der Schlüsselfaktor, da 2) eine gute Leistung erbracht hat.
(Nach der Antwort von pwilmot wurden folgende hinzugefügt.)
Zur Information lautet die Bildschirmaufnahme des Leistungsmonitors wie folgt:
- Ganglia-Datenknotenübersicht für (1) - Job gestartet um 04:37.
- Ganglia-Datenknotenübersicht für (3) - Job gestartet um 19:47. Bitte ignorieren Sie die Grafik vor diesem Zeitpunkt.
Das Diagramm ist grob in zwei Abschnitte unterteilt:
- Erstens: von Anfang bis zur ReduzierungByKey: CPU-intensiv, keine Netzwerkaktivität
- Zweitens: Nach dem Reduzieren von ByKey: CPU wird die Netzwerk-E / A abgeschlossen.
Wie die Grafik zeigt, kann (1) so viel CPU-Leistung verbrauchen, wie angegeben wurde. Es könnte also nicht das Problem der Anzahl der Threads sein.
Wie kann man dieses Ergebnis erklären?
quelle
Antworten:
Die Erklärung wurde in einem Artikel in Clouderas Blog " How-to: Tune Your Apache Spark Jobs" (Teil 2) gegeben .
quelle
yarn.scheduler.capacity.resource-calculator
Behinderte gilt, was die Standardeinstellung ist. Dies liegt daran, dass die Zeitplanung standardmäßig nach Speicher und nicht nach CPU erfolgt.Laut Sandy Ryza läuft Ihre Spark-App auf HDFS
Daher glaube ich, dass Ihre erste Konfiguration aufgrund des schlechten HDFS-E / A-Durchsatzes langsamer ist als die dritte
quelle
Ich habe selbst nicht mit diesen Einstellungen gespielt, daher handelt es sich nur um Spekulationen. Wenn wir dieses Problem jedoch als normale Kerne und Threads in einem verteilten System betrachten, können Sie in Ihrem Cluster bis zu 12 Kerne (4 * 3 Computer) und 24 Threads verwenden (8 * 3 Maschinen). In Ihren ersten beiden Beispielen geben Sie Ihrem Job eine angemessene Anzahl von Kernen (potenzieller Rechenraum), aber die Anzahl der Threads (Jobs), die auf diesen Kernen ausgeführt werden sollen, ist so begrenzt, dass Sie nicht viel von der zugewiesenen Verarbeitungsleistung verwenden können und daher ist der Job langsamer, obwohl mehr Rechenressourcen zugewiesen sind.
Sie erwähnen, dass Ihr Anliegen im Shuffle-Schritt lag. Obwohl es hilfreich ist, den Overhead im Shuffle-Schritt zu begrenzen, ist es im Allgemeinen viel wichtiger, die Parallelisierung des Clusters zu nutzen. Denken Sie an den Extremfall - ein Single-Threaded-Programm ohne Shuffle.
quelle
Ich denke, die Antwort hier ist möglicherweise etwas einfacher als einige der Empfehlungen hier.
Der Hinweis für mich ist im Cluster-Netzwerkdiagramm. Für Lauf 1 liegt die Auslastung konstant bei ~ 50 MByte / s. Für Lauf 3 wird die stetige Auslastung verdoppelt, etwa 100 MByte / s.
Von der cloudera Blog - Post von gemeinsamen DzOrd , können Sie dieses wichtige Angebot finden Sie unter :
Lassen Sie uns ein paar Berechnungen durchführen, um zu sehen, welche Leistung wir erwarten, wenn dies zutrifft.
Führen Sie 1: 19 GB, 7 Kerne und 3 Executoren aus
Führen Sie 3: 4 GB, 2 Kerne und 12 Executoren aus
Wenn der Job zu 100% durch Parallelität begrenzt ist (Anzahl der Threads). Wir würden erwarten, dass die Laufzeit perfekt umgekehrt mit der Anzahl der Threads korreliert.
So
ratio_num_threads ~= inv_ratio_runtime
, und es sieht , als wären wir netzwerkbeschränkt.Der gleiche Effekt erklärt den Unterschied zwischen Lauf 1 und Lauf 2.
Führen Sie 2: 19 GB, 4 Kerne und 3 Executoren aus
Vergleichen der Anzahl der effektiven Threads und der Laufzeit:
Es ist nicht so perfekt wie der letzte Vergleich, aber wir sehen immer noch einen ähnlichen Leistungsabfall, wenn wir Threads verlieren.
Nun zum letzten Punkt: Warum ist es so, dass wir mit mehr Threads eine bessere Leistung erzielen, insb. mehr Threads als die Anzahl der CPUs?
Eine gute Erklärung für den Unterschied zwischen Parallelität (was wir durch Aufteilen von Daten auf mehrere CPUs erhalten) und Parallelität (was wir erhalten, wenn wir mehrere Threads verwenden, um an einer einzelnen CPU zu arbeiten) finden Sie in diesem großartigen Beitrag von Rob Pike: Concurrency ist keine Parallelität .
Die kurze Erklärung lautet: Wenn ein Spark-Job mit einem Dateisystem oder Netzwerk interagiert, verbringt die CPU viel Zeit damit, auf die Kommunikation mit diesen Schnittstellen zu warten und nicht viel Zeit damit zu verbringen, tatsächlich "zu arbeiten". Indem Sie diesen CPUs mehr als eine Aufgabe gleichzeitig zur Verfügung stellen, verbringen sie weniger Zeit mit Warten und mehr Zeit mit Arbeiten, und Sie sehen eine bessere Leistung.
quelle
Aus den hervorragenden Ressourcen, die auf der Sparklyr-Paketseite von RStudio verfügbar sind :
quelle
Die dynamische Zuweisung von Spark bietet Flexibilität und weist Ressourcen dynamisch zu. In dieser Anzahl von Min- und Max-Executoren können angegeben werden. Es kann auch die Anzahl der Executoren angegeben werden, die zu Beginn der Anwendung gestartet werden müssen.
Lesen Sie unten auf dem gleichen:
quelle
Es gibt ein kleines Problem in den ersten beiden Konfigurationen, denke ich. Die Konzepte von Threads und Kernen wie folgt. Das Konzept des Threading ist, wenn die Kerne ideal sind, dann verwenden Sie diesen Kern, um die Daten zu verarbeiten. Daher ist der Speicher in den ersten beiden Fällen nicht voll ausgelastet. Wenn Sie dieses Beispiel als Benchmark verwenden möchten, wählen Sie die Maschinen mit mehr als 10 Kernen pro Maschine aus. Dann machen Sie die Benchmark.
Geben Sie jedoch nicht mehr als 5 Kerne pro Executor an, da die I / O-Leistung einen Flaschenhals aufweist.
Die besten Maschinen für dieses Benchmarking sind möglicherweise Datenknoten mit 10 Kernen.
Datenknoten-Maschinenspezifikation: CPU: Core i7-4790 (Anzahl der Kerne: 10, Anzahl der Threads: 20) RAM: 32 GB (8 GB x 4) Festplatte: 8 TB (2 TB x 4)
quelle
Ich denke, einer der Hauptgründe ist die Lokalität. Ihre Eingabedateigröße beträgt 165 GB, die zugehörigen Blöcke der Datei sind sicherlich auf mehrere DataNodes verteilt. Mehr Ausführende können eine Netzwerkkopie vermeiden.
Versuchen Sie, die Anzahl der Blöcke für Executor gleich zu setzen. Ich denke, das kann schneller sein.
quelle