Die meisten RDD-Operationen sind faul. Stellen Sie sich eine RDD als Beschreibung einer Reihe von Operationen vor. Ein RDD sind keine Daten. Also diese Zeile:
val textFile = sc.textFile("/user/emp.txt")
Es macht nichts. Es wird eine RDD erstellt, die besagt, dass diese Datei geladen werden muss. Die Datei wird zu diesem Zeitpunkt nicht geladen.
RDD-Vorgänge, bei denen der Inhalt der Daten überwacht werden muss, können nicht verzögert werden. (Diese werden als Aktionen bezeichnet .) Ein Beispiel ist RDD.count
: Um die Anzahl der Zeilen in der Datei anzugeben, muss die Datei gelesen werden. Wenn Sie also schreiben textFile.count
, wird zu diesem Zeitpunkt die Datei gelesen, die Zeilen werden gezählt und die Anzahl wird zurückgegeben.
Was ist, wenn Sie textFile.count
erneut anrufen ? Das Gleiche: Die Datei wird erneut gelesen und gezählt. Es wird nichts gespeichert. Ein RDD sind keine Daten.
Was macht RDD.cache
also? Wenn Sie textFile.cache
den obigen Code hinzufügen :
val textFile = sc.textFile("/user/emp.txt")
textFile.cache
Es macht nichts. RDD.cache
ist auch eine faule Operation. Die Datei wird immer noch nicht gelesen. Aber jetzt sagt das RDD "Lies diese Datei und speichere dann den Inhalt zwischen". Wenn Sie dann textFile.count
das erste Mal ausführen , wird die Datei geladen, zwischengespeichert und gezählt. Wenn Sie textFile.count
ein zweites Mal aufrufen , verwendet der Vorgang den Cache. Es werden nur die Daten aus dem Cache genommen und die Zeilen gezählt.
Das Cache-Verhalten hängt vom verfügbaren Speicher ab. Wenn die Datei beispielsweise nicht in den Speicher passt, textFile.count
wird auf das übliche Verhalten zurückgegriffen und die Datei erneut gelesen.
perisist
eine Option darin, eine Speicheroption zu verwenden und auszuwählen, mit der die Cache-Daten auf die Festplatte übertragen werden können.Ich denke, die Frage wäre besser formuliert als:
Wann müssen wir den Cache aufrufen oder auf einer RDD bestehen bleiben?
Spark-Prozesse sind faul, das heißt, nichts wird passieren, bis es erforderlich ist. Um die Frage schnell zu beantworten
val textFile = sc.textFile("/user/emp.txt")
, passiert nach der Ausgabe nichts mit den Daten, sondern nur eineHadoopRDD
, die die Datei als Quelle verwendet.Nehmen wir an, wir transformieren diese Daten ein wenig:
Auch hier passiert nichts mit den Daten. Jetzt gibt es eine neue RDD
wordsRDD
, die einen Verweis auftestFile
und eine Funktion enthält, die bei Bedarf angewendet werden kann.Nur wenn eine Aktion für eine RDD aufgerufen wird, wie z. B.
wordsRDD.count
die RDD-Kette, die als Linie bezeichnet wird, wird sie ausgeführt. Das heißt, die in Partitionen aufgeschlüsselten Daten werden von den Ausführenden des Spark-Clusters geladen, dieflatMap
Funktion wird angewendet und das Ergebnis wird berechnet.Auf einer linearen Linie wie der in diesem Beispiel
cache()
wird nicht benötigt. Die Daten werden in die Ausführenden geladen, alle Transformationen werden angewendet und schließlichcount
werden die Transformationen berechnet, alle im Speicher - wenn die Daten in den Speicher passen.cache
ist nützlich, wenn sich die Abstammungslinie des RDD verzweigt. Angenommen, Sie möchten die Wörter des vorherigen Beispiels in eine Anzahl für positive und negative Wörter filtern. Sie könnten dies so tun:Hier gibt jeder Zweig ein erneutes Laden der Daten aus. Durch Hinzufügen einer expliziten
cache
Anweisung wird sichergestellt, dass die zuvor durchgeführte Verarbeitung beibehalten und wiederverwendet wird. Der Job wird so aussehen:Aus diesem Grund
cache
wird gesagt, "die Linie zu brechen", da es einen Prüfpunkt schafft, der für die weitere Verarbeitung wiederverwendet werden kann.Faustregel: Verwenden
cache
Sie diese Option , wenn sich die Abstammungslinie Ihres RDD verzweigt oder wenn ein RDD wie in einer Schleife mehrmals verwendet wird.quelle
spark.storage.memoryFraction
. In Bezug darauf, welcher Executor über welche Daten verfügt, verfolgt ein RDD seine Partitionen, die auf den Executoren verteilt sind.cache
noch die Linie brechenpersist
kann .Müssen wir "Cache" oder "Persist" explizit aufrufen, um die RDD-Daten im Speicher zu speichern?
Ja, nur bei Bedarf.
Die RDD-Daten werden standardmäßig verteilt im Speicher gespeichert?
Nein!
Und das sind die Gründe warum:
Spark unterstützt zwei Arten von gemeinsam genutzten Variablen: Broadcast-Variablen, mit denen ein Wert im Speicher auf allen Knoten zwischengespeichert werden kann, und Akkumulatoren, bei denen es sich um Variablen handelt, die nur „hinzugefügt“ werden, z. B. Zähler und Summen.
RDDs unterstützen zwei Arten von Operationen: Transformationen, die ein neues Dataset aus einem vorhandenen erstellen, und Aktionen, die nach dem Ausführen einer Berechnung für das Dataset einen Wert an das Treiberprogramm zurückgeben. Map ist beispielsweise eine Transformation, die jedes Datensatzelement durch eine Funktion leitet und eine neue RDD zurückgibt, die die Ergebnisse darstellt. Auf der anderen Seite ist Reduzieren eine Aktion, die alle Elemente der RDD mithilfe einer Funktion aggregiert und das Endergebnis an das Treiberprogramm zurückgibt (obwohl es auch einen parallelen reduzierten ByKey gibt, der ein verteiltes Dataset zurückgibt).
Alle Transformationen in Spark sind insofern faul, als sie ihre Ergebnisse nicht sofort berechnen. Stattdessen erinnern sie sich nur an die Transformationen, die auf einen Basisdatensatz (z. B. eine Datei) angewendet wurden. Die Transformationen werden nur berechnet, wenn für eine Aktion ein Ergebnis an das Treiberprogramm zurückgegeben werden muss. Durch dieses Design kann Spark effizienter ausgeführt werden. Beispielsweise können wir erkennen, dass ein über die Karte erstelltes Dataset für eine Reduzierung verwendet wird und nur das Ergebnis der Reduzierung an den Treiber zurückgibt, anstatt das größere zugeordnete Dataset.
Standardmäßig kann jede transformierte RDD jedes Mal neu berechnet werden, wenn Sie eine Aktion darauf ausführen. Sie können eine RDD jedoch auch mithilfe der Persist- (oder Cache-) Methode im Speicher beibehalten. In diesem Fall behält Spark die Elemente im Cluster bei, um beim nächsten Abfragen einen viel schnelleren Zugriff zu ermöglichen. Es gibt auch Unterstützung für persistente RDDs auf der Festplatte oder für die Replikation über mehrere Knoten.
Weitere Informationen finden Sie in der Spark-Programmieranleitung .
quelle
Im Folgenden sind die drei Situationen aufgeführt, in denen Sie Ihre RDDs zwischenspeichern sollten:
quelle
Hinzufügen eines weiteren Grundes zum Hinzufügen (oder vorübergehenden Hinzufügen) eines
cache
Methodenaufrufs.für Debug-Speicherprobleme
Mit der
cache
Methode gibt spark Debugging-Informationen zur Größe des RDD. In der funkenintegrierten Benutzeroberfläche erhalten Sie Informationen zum RDD-Speicherverbrauch. und dies erwies sich als sehr hilfreich bei der Diagnose von Speicherproblemen.quelle