(Warum) müssen wir den Cache aufrufen oder auf einer RDD bestehen bleiben?

171

Wenn ein ausfallsicheres verteiltes Dataset (RDD) aus einer Textdatei oder -sammlung (oder aus einem anderen RDD) erstellt wird, müssen wir "Cache" oder "Persist" explizit aufrufen, um die RDD-Daten im Speicher zu speichern? Oder werden die RDD-Daten standardmäßig verteilt im Speicher gespeichert?

val textFile = sc.textFile("/user/emp.txt")

Nach meinem Verständnis ist textFile nach dem obigen Schritt eine RDD und im gesamten / einigen Speicher des Knotens verfügbar.

Wenn ja, warum müssen wir dann "cache" oder "persist" auf textFile RDD aufrufen?

Ramana
quelle

Antworten:

300

Die meisten RDD-Operationen sind faul. Stellen Sie sich eine RDD als Beschreibung einer Reihe von Operationen vor. Ein RDD sind keine Daten. Also diese Zeile:

val textFile = sc.textFile("/user/emp.txt")

Es macht nichts. Es wird eine RDD erstellt, die besagt, dass diese Datei geladen werden muss. Die Datei wird zu diesem Zeitpunkt nicht geladen.

RDD-Vorgänge, bei denen der Inhalt der Daten überwacht werden muss, können nicht verzögert werden. (Diese werden als Aktionen bezeichnet .) Ein Beispiel ist RDD.count: Um die Anzahl der Zeilen in der Datei anzugeben, muss die Datei gelesen werden. Wenn Sie also schreiben textFile.count, wird zu diesem Zeitpunkt die Datei gelesen, die Zeilen werden gezählt und die Anzahl wird zurückgegeben.

Was ist, wenn Sie textFile.counterneut anrufen ? Das Gleiche: Die Datei wird erneut gelesen und gezählt. Es wird nichts gespeichert. Ein RDD sind keine Daten.

Was macht RDD.cachealso? Wenn Sie textFile.cacheden obigen Code hinzufügen :

val textFile = sc.textFile("/user/emp.txt")
textFile.cache

Es macht nichts. RDD.cacheist auch eine faule Operation. Die Datei wird immer noch nicht gelesen. Aber jetzt sagt das RDD "Lies diese Datei und speichere dann den Inhalt zwischen". Wenn Sie dann textFile.countdas erste Mal ausführen , wird die Datei geladen, zwischengespeichert und gezählt. Wenn Sie textFile.countein zweites Mal aufrufen , verwendet der Vorgang den Cache. Es werden nur die Daten aus dem Cache genommen und die Zeilen gezählt.

Das Cache-Verhalten hängt vom verfügbaren Speicher ab. Wenn die Datei beispielsweise nicht in den Speicher passt, textFile.countwird auf das übliche Verhalten zurückgegriffen und die Datei erneut gelesen.

Daniel Darabos
quelle
4
Hallo Daniel, wenn Sie den Cache aufrufen, bedeutet dies, dass die RDD nicht aus der Quelle neu geladen wird (z. B. Textdatei) - wie können Sie sicher sein, dass die Daten aus der Textdatei aktuell sind, wenn sie zwischengespeichert werden? (Findet Spark dies heraus oder ist es eine manuelle Operation, um () regelmäßig aufzuheben (), um sicherzustellen, dass die Quelldaten später in der Linie neu
berechnet werden
Müssen Sie - wenn Sie die Aufhebung regelmäßig aufheben müssen - wenn Sie eine zwischengespeicherte Festplatte haben, die von einer anderen zwischengespeicherten RDD abhängig ist, beide RDDs aufheben müssen, um neu berechnete Ergebnisse zu sehen?
Andrew.butkus
21
Spark geht nur davon aus, dass sich die Datei niemals ändern wird. Es liest die Datei zu einem beliebigen Zeitpunkt und kann Teile davon bei Bedarf später erneut lesen. (Zum Beispiel, wenn ein Teil der Daten aus dem Cache verschoben wurde.) Sie sollten also Ihre Dateien unverändert lassen! Erstellen Sie einfach eine neue Datei mit einem neuen Namen, wenn Sie neue Daten haben, und laden Sie sie dann als neue RDD. Wenn Sie ständig neue Daten erhalten, schauen Sie sich Spark Streaming an.
Daniel Darabos
10
Ja. RDDs sind unveränderlich, daher geht jeder RDD davon aus, dass seine Abhängigkeiten ebenfalls unveränderlich sind. Mit Spark Streaming können Sie solche Bäume einrichten, die mit einem Strom von Änderungen arbeiten. Eine noch einfachere Lösung besteht darin, den Baum in einer Funktion zu erstellen, deren Parameter ein Dateiname ist. Rufen Sie dann einfach die Funktion für die neue Datei und den neuen Poof auf. Sie haben den neuen Berechnungsbaum.
Daniel Darabos
1
@ Humoyun: Auf der Registerkarte Speicher der Spark-Benutzeroberfläche können Sie sehen, wie viel von jedem RDD zwischengespeichert wird. Die Daten sind möglicherweise so groß, dass nur 40% in den Gesamtspeicher passen, den Sie für das Caching haben. In diesem Fall besteht perisisteine Option darin, eine Speicheroption zu verwenden und auszuwählen, mit der die Cache-Daten auf die Festplatte übertragen werden können.
Daniel Darabos
197

Ich denke, die Frage wäre besser formuliert als:

Wann müssen wir den Cache aufrufen oder auf einer RDD bestehen bleiben?

Spark-Prozesse sind faul, das heißt, nichts wird passieren, bis es erforderlich ist. Um die Frage schnell zu beantworten val textFile = sc.textFile("/user/emp.txt"), passiert nach der Ausgabe nichts mit den Daten, sondern nur eine HadoopRDD, die die Datei als Quelle verwendet.

Nehmen wir an, wir transformieren diese Daten ein wenig:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

Auch hier passiert nichts mit den Daten. Jetzt gibt es eine neue RDD wordsRDD, die einen Verweis auf testFileund eine Funktion enthält, die bei Bedarf angewendet werden kann.

Nur wenn eine Aktion für eine RDD aufgerufen wird, wie z. B. wordsRDD.countdie RDD-Kette, die als Linie bezeichnet wird, wird sie ausgeführt. Das heißt, die in Partitionen aufgeschlüsselten Daten werden von den Ausführenden des Spark-Clusters geladen, die flatMapFunktion wird angewendet und das Ergebnis wird berechnet.

Auf einer linearen Linie wie der in diesem Beispiel cache()wird nicht benötigt. Die Daten werden in die Ausführenden geladen, alle Transformationen werden angewendet und schließlich countwerden die Transformationen berechnet, alle im Speicher - wenn die Daten in den Speicher passen.

cacheist nützlich, wenn sich die Abstammungslinie des RDD verzweigt. Angenommen, Sie möchten die Wörter des vorherigen Beispiels in eine Anzahl für positive und negative Wörter filtern. Sie könnten dies so tun:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Hier gibt jeder Zweig ein erneutes Laden der Daten aus. Durch Hinzufügen einer expliziten cacheAnweisung wird sichergestellt, dass die zuvor durchgeführte Verarbeitung beibehalten und wiederverwendet wird. Der Job wird so aussehen:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Aus diesem Grund cachewird gesagt, "die Linie zu brechen", da es einen Prüfpunkt schafft, der für die weitere Verarbeitung wiederverwendet werden kann.

Faustregel: Verwenden cacheSie diese Option , wenn sich die Abstammungslinie Ihres RDD verzweigt oder wenn ein RDD wie in einer Schleife mehrmals verwendet wird.

maasg
quelle
1
Genial. Vielen Dank. Noch eine verwandte Frage. Wenn wir zwischenspeichern oder beibehalten, werden Daten im Speicher des Ausführenden oder im Arbeiterknoten gespeichert. Wenn es sich um den Speicher des Executors handelt, identifiziert How Spark, welcher Executor über die Daten verfügt.
Ramana
1
@RamanaUppala Der Executor-Speicher wird verwendet. Der Anteil des Executor-Speichers, der für das Caching verwendet wird, wird von der Konfiguration gesteuert spark.storage.memoryFraction. In Bezug darauf, welcher Executor über welche Daten verfügt, verfolgt ein RDD seine Partitionen, die auf den Executoren verteilt sind.
Maasg
5
@maasg Korrigiere mich, wenn ich falsch liege, aber weder cachenoch die Linie brechenpersist kann .
Null 323
Wo würden die Wörter RDD gespeichert, wenn wir die Anweisung .cache () im obigen Beispiel nicht gehabt hätten?
sun_dare
Was ist, wenn wir vor den beiden Zählungen die beiden Zweige wieder zu einer Festplatte vereinen und zählen? Ist in diesem Fall der Cache von Vorteil?
Xiawei Zhang
30

Müssen wir "Cache" oder "Persist" explizit aufrufen, um die RDD-Daten im Speicher zu speichern?

Ja, nur bei Bedarf.

Die RDD-Daten werden standardmäßig verteilt im Speicher gespeichert?

Nein!

Und das sind die Gründe warum:

  • Spark unterstützt zwei Arten von gemeinsam genutzten Variablen: Broadcast-Variablen, mit denen ein Wert im Speicher auf allen Knoten zwischengespeichert werden kann, und Akkumulatoren, bei denen es sich um Variablen handelt, die nur „hinzugefügt“ werden, z. B. Zähler und Summen.

  • RDDs unterstützen zwei Arten von Operationen: Transformationen, die ein neues Dataset aus einem vorhandenen erstellen, und Aktionen, die nach dem Ausführen einer Berechnung für das Dataset einen Wert an das Treiberprogramm zurückgeben. Map ist beispielsweise eine Transformation, die jedes Datensatzelement durch eine Funktion leitet und eine neue RDD zurückgibt, die die Ergebnisse darstellt. Auf der anderen Seite ist Reduzieren eine Aktion, die alle Elemente der RDD mithilfe einer Funktion aggregiert und das Endergebnis an das Treiberprogramm zurückgibt (obwohl es auch einen parallelen reduzierten ByKey gibt, der ein verteiltes Dataset zurückgibt).

  • Alle Transformationen in Spark sind insofern faul, als sie ihre Ergebnisse nicht sofort berechnen. Stattdessen erinnern sie sich nur an die Transformationen, die auf einen Basisdatensatz (z. B. eine Datei) angewendet wurden. Die Transformationen werden nur berechnet, wenn für eine Aktion ein Ergebnis an das Treiberprogramm zurückgegeben werden muss. Durch dieses Design kann Spark effizienter ausgeführt werden. Beispielsweise können wir erkennen, dass ein über die Karte erstelltes Dataset für eine Reduzierung verwendet wird und nur das Ergebnis der Reduzierung an den Treiber zurückgibt, anstatt das größere zugeordnete Dataset.

  • Standardmäßig kann jede transformierte RDD jedes Mal neu berechnet werden, wenn Sie eine Aktion darauf ausführen. Sie können eine RDD jedoch auch mithilfe der Persist- (oder Cache-) Methode im Speicher beibehalten. In diesem Fall behält Spark die Elemente im Cluster bei, um beim nächsten Abfragen einen viel schnelleren Zugriff zu ermöglichen. Es gibt auch Unterstützung für persistente RDDs auf der Festplatte oder für die Replikation über mehrere Knoten.

Weitere Informationen finden Sie in der Spark-Programmieranleitung .

Eliasah
quelle
1
Das hat meine Frage nicht beantwortet.
Ramana
Was beantwortet es nicht?
Eliasah
1
Warum müssen wir Cache oder Persist aufrufen, wenn die RDD-Daten im Standardspeicher gespeichert sind?
Ramana
RDDs werden standardmäßig nicht im Speicher gespeichert.
Wenn Sie
2
Es ist eine gute Antwort, ich weiß nicht, warum es abgelehnt wurde. Es ist eine Top-Down-Antwort, die erklärt, wie RDDs anhand der übergeordneten Konzepte funktionieren. Ich habe eine weitere Antwort hinzugefügt, die von unten nach oben geht: Ausgehend von "Was macht diese Zeile?". Vielleicht ist es einfacher, jemandem zu folgen, der gerade erst mit Spark anfängt.
Daniel Darabos
11

Im Folgenden sind die drei Situationen aufgeführt, in denen Sie Ihre RDDs zwischenspeichern sollten:

viele Male mit einem RDD

Ausführen mehrerer Aktionen auf demselben RDD

für lange Ketten von (oder sehr teuren) Transformationen

Rileyss
quelle
7

Hinzufügen eines weiteren Grundes zum Hinzufügen (oder vorübergehenden Hinzufügen) eines cacheMethodenaufrufs.

für Debug-Speicherprobleme

Mit der cacheMethode gibt spark Debugging-Informationen zur Größe des RDD. In der funkenintegrierten Benutzeroberfläche erhalten Sie Informationen zum RDD-Speicherverbrauch. und dies erwies sich als sehr hilfreich bei der Diagnose von Speicherproblemen.

Zicken
quelle