Verwenden von spark 2.4.4 im YARN-Cluster-Modus mit dem Spark-FIFO-Scheduler.
Ich sende mehrere Spark-Datenrahmenoperationen (dh das Schreiben von Daten in S3) mit einem Thread-Pool-Executor mit einer variablen Anzahl von Threads. Dies funktioniert gut, wenn ich ~ 10 Threads habe, aber wenn ich Hunderte von Threads verwende, scheint es einen Deadlock zu geben, da keine Jobs gemäß der Spark-Benutzeroberfläche geplant sind.
Welche Faktoren steuern, wie viele Jobs gleichzeitig geplant werden können? Treiberressourcen (zB Speicher / Kerne)? Einige andere Einstellungen für die Funkenkonfiguration?
BEARBEITEN:
Hier ist eine kurze Zusammenfassung meines Codes
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);
Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);
List<Future<Void>> futures = listOfSeveralHundredThings
.stream()
.map(aThing -> ecs.submit(() -> {
df
.filter(col("some_column").equalTo(aThing))
.write()
.format("org.apache.hudi")
.options(writeOptions)
.save(outputPathFor(aThing));
return null;
}))
.collect(Collectors.toList());
IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();
Irgendwann, wenn der nThreads
Anstieg zunimmt, scheint Spark keine Jobs mehr zu planen, wie gezeigt wird durch:
ecs.poll(...)
Zeitüberschreitung irgendwann- Auf der Registerkarte Spark UI-Jobs werden keine aktiven Jobs angezeigt
- Auf der Registerkarte "Executoren der Spark-Benutzeroberfläche" werden keine aktiven Aufgaben für einen Executor angezeigt
- Die Registerkarte Spark UI SQL zeigt
nThreads
laufende Abfragen ohne laufende Job-IDs an
Meine Ausführungsumgebung ist
- AWS EMR 5.28.1
- Funke 2.4.4
- Hauptknoten =
m5.4xlarge
- Kernknoten = 3x
rd5.24xlarge
spark.driver.cores=24
spark.driver.memory=32g
spark.executor.memory=21g
spark.scheduler.mode=FIFO
quelle
jstack -l
in beiden Fällen einen Thread-Dump mit Sperrinformationen aus.Antworten:
Wenn möglich, schreiben Sie die Ausgabe der Jobs in AWS Elastic MapReduce hdfs (um die fast sofortigen Umbenennungen und die bessere Datei-E / A der lokalen hdfs zu nutzen) und fügen Sie einen dstcp-Schritt hinzu, um die Dateien nach S3 zu verschieben, um sich alle Probleme bei der Behandlung zu ersparen Innereien eines Objektspeichers, der versucht, ein Dateisystem zu sein. Wenn Sie auch in lokale HDFS schreiben, können Sie Spekulationen ermöglichen, außer Kontrolle geratene Aufgaben zu steuern, ohne in die mit DirectOutputCommiter verknüpften Deadlock-Traps zu geraten.
Wenn Sie S3 als Ausgabeverzeichnis verwenden müssen, stellen Sie sicher, dass die folgenden Spark-Konfigurationen festgelegt sind
Hinweis: DirectParquetOutputCommitter wird aufgrund der Möglichkeit eines Datenverlusts aus Spark 2.0 entfernt. Leider müssen wir mit den Problemumgehungen arbeiten, bis wir die Konsistenz von S3a verbessert haben. Mit Hadoop 2.8 verbessern sich die Dinge
Vermeiden Sie Schlüsselnamen in lexikografischer Reihenfolge. Sie können Hashing / zufällige Präfixe oder die umgekehrte Datums- und Uhrzeit verwenden, um sich fortzubewegen. Der Trick besteht darin, Ihre Schlüssel hierarchisch zu benennen und die häufigsten Dinge, nach denen Sie filtern, auf der linken Seite Ihres Schlüssels zu platzieren. Und haben Sie niemals Unterstriche in Bucket-Namen aufgrund von DNS-Problemen.
fs.s3a.fast.upload upload
Parallele Aktivierung von Teilen einer einzelnen Datei für Amazon S3Weitere Informationen finden Sie in diesen Artikeln.
Festlegen der spark.speculation in Spark 2.1.0 beim Schreiben in s3
https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98
quelle
IMO nähern Sie sich diesem Problem wahrscheinlich falsch. Wenn Sie nicht garantieren können, dass die Anzahl der Aufgaben pro Job sehr gering ist, werden Sie wahrscheinlich keine große Leistungsverbesserung erzielen, wenn Sie Hunderte von Jobs gleichzeitig parallelisieren. Ihr Cluster kann nur 300 Aufgaben gleichzeitig unterstützen, vorausgesetzt, Sie verwenden die Standardparallelität von 200, dh nur 1,5 Jobs. Ich würde vorschlagen, Ihren Code neu zu schreiben, um die maximale Anzahl gleichzeitiger Abfragen auf 10 zu begrenzen. Ich vermute sehr, dass Sie 300 Abfragen haben, bei denen nur eine einzige Aufgabe von mehreren hundert tatsächlich ausgeführt wird. Die meisten OLTP-Datenverarbeitungssysteme haben aus diesem Grund absichtlich eine relativ geringe Anzahl gleichzeitiger Abfragen im Vergleich zu herkömmlichen RDS-Systemen.
ebenfalls
quelle