Ich versuche zu verstehen, warum das folgende Java-Programm ein gibt OutOfMemoryError
, während das entsprechende Programm ohne .parallel()
dies nicht tut.
System.out.println(Stream
.iterate(1, i -> i+1)
.parallel()
.flatMap(n -> Stream.iterate(n, i -> i+n))
.mapToInt(Integer::intValue)
.limit(100_000_000)
.sum()
);
Ich habe zwei Fragen:
Was ist die beabsichtigte Ausgabe dieses Programms?
Ohne
.parallel()
es scheint, dass dies einfachsum(1+2+3+...)
ausgegeben wird, was bedeutet, dass es einfach beim ersten Stream in der flatMap "hängen bleibt", was Sinn macht.Bei Parallel weiß ich nicht, ob es ein erwartetes Verhalten gibt, aber ich vermute, dass es irgendwie die ersten
n
oder so Streams verschachtelt hat , won
ist die Anzahl der Parallelarbeiter. Es kann auch etwas anders sein, basierend auf dem Chunking / Puffering-Verhalten.Was führt dazu, dass der Speicher knapp wird? Ich versuche speziell zu verstehen, wie diese Streams unter der Haube implementiert werden.
Ich vermute, dass etwas den Stream blockiert, so dass er nie beendet wird und die generierten Werte entfernen kann, aber ich weiß nicht genau, in welcher Reihenfolge die Dinge ausgewertet werden und wo Pufferung auftritt.
Bearbeiten: Falls es relevant ist, verwende ich Java 11.
Editt 2: Anscheinend passiert das Gleiche auch für das einfache Programm IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()
, also hat es möglicherweise eher mit der Faulheit von limit
als mit zu tun flatMap
.
quelle
Antworten:
Sie sagen: " Aber ich weiß nicht genau, in welcher Reihenfolge die Dinge ausgewertet werden und wo die Pufferung stattfindet. " Genau darum geht es bei parallelen Streams. Die Reihenfolge der Bewertung ist nicht angegeben.
Ein kritischer Aspekt Ihres Beispiels ist das
.limit(100_000_000)
. Dies bedeutet, dass die Implementierung nicht nur beliebige Werte summieren kann, sondern die ersten 100.000.000 Zahlen summieren muss . Beachten Sie, dass in der Referenzimplementierung.unordered().limit(100_000_000)
das Ergebnis nicht geändert wird, was darauf hinweist, dass es keine spezielle Implementierung für den ungeordneten Fall gibt, dies jedoch ein Implementierungsdetail ist.Wenn Worker-Threads die Elemente verarbeiten, können sie sie nicht einfach zusammenfassen, da sie wissen müssen, welche Elemente sie verwenden dürfen, was davon abhängt, wie viele Elemente ihrer spezifischen Arbeitslast vorausgehen. Da dieser Stream die Größen nicht kennt, kann dies nur bekannt sein, wenn die Präfixelemente verarbeitet wurden, was bei unendlichen Streams niemals der Fall ist. Damit die Worker-Threads für den Moment weiter puffern, werden diese Informationen verfügbar.
Wenn ein Worker-Thread weiß, dass er den Arbeitsblock ganz links verarbeitet, kann er die Elemente im Prinzip sofort zusammenfassen, zählen und das Ende signalisieren, wenn das Limit erreicht ist. Der Stream könnte also beendet werden, dies hängt jedoch von vielen Faktoren ab.
In Ihrem Fall ist ein plausibles Szenario, dass die anderen Worker-Threads Puffer schneller zuweisen, als der Job ganz links zählt. In diesem Szenario können geringfügige Änderungen am Timing dazu führen, dass der Stream gelegentlich mit einem Wert zurückkehrt.
Wenn wir alle Worker-Threads mit Ausnahme des Threads, der den Block ganz links verarbeitet, verlangsamen, können wir den Stream beenden (zumindest in den meisten Läufen):
¹ Ich folge einem Vorschlag von Stuart Marks , die Reihenfolge von links nach rechts zu verwenden, wenn über die Reihenfolge der Begegnung und nicht über die Verarbeitungsreihenfolge gesprochen wird.
quelle
Files.lines(…)
? Es wurde in Java 9 erheblich verbessert.BufferedReader.lines()
bestimmten Umständen immer noch darauf zurückgegriffen (nicht auf das Standarddateisystem, einen speziellen Zeichensatz oder die Größe größer alsInteger.MAX_FILES
). Wenn eine dieser Bedingungen zutrifft, kann eine benutzerdefinierte Lösung hilfreich sein. Dies wäre eine neue Frage undInteger.MAX_VALUE
natürlich…Meine beste Vermutung ist , dass das Hinzufügen
parallel()
ändert das interne Verhalten vonflatMap()
denen bereits hatten Probleme lazily vor der Auswertung .Der
OutOfMemoryError
Fehler, den Sie erhalten, wurde in [JDK-8202307] Abrufen eines java.lang.OutOfMemoryError: Java-Heapspeichers beim Aufrufen von Stream.iterator (). Next () in einem Stream gemeldet, der einen unendlichen / sehr großen Stream in flatMap verwendet . Wenn Sie sich das Ticket ansehen, ist es mehr oder weniger dieselbe Stapelverfolgung, die Sie erhalten. Das Ticket wurde aus folgendem Grund als Won't Fix geschlossen:quelle
OOME wird nicht dadurch verursacht , dass der Stream unendlich ist, sondern dadurch, dass dies nicht der Fall ist .
Das heißt, wenn Sie das auskommentieren
.limit(...)
, wird ihm nie der Speicher ausgehen - aber natürlich wird es auch nie enden.Sobald es geteilt ist, kann der Stream die Anzahl der Elemente nur verfolgen, wenn sie in jedem Thread akkumuliert sind (sieht aus wie der tatsächliche Akkumulator
Spliterators$ArraySpliterator#array
).Sieht so aus, als könnten Sie es ohne reproduzieren.
flatMap
Führen Sie einfach Folgendes aus mit-Xmx128m
:Nach dem Auskommentieren von
limit()
sollte es jedoch einwandfrei funktionieren, bis Sie sich entscheiden, Ihren Laptop zu schonen.Neben den eigentlichen Implementierungsdetails passiert meiner Meinung nach Folgendes:
Mit möchte
limit
dersum
Reduzierer, dass die ersten X-Elemente summiert werden, sodass kein Thread Teilsummen ausgeben kann. Jedes "Slice" (Thread) muss Elemente akkumulieren und durchlaufen. Ohne Einschränkung gibt es keine solche Einschränkung, so dass jedes "Slice" nur die Teilsumme aus den Elementen berechnet, die es (für immer) erhält, vorausgesetzt, es gibt schließlich das Ergebnis aus.quelle
parallel()
wirdForkJoinPool
intern verwendet, um Parallelität zu erreichen. DasSpliterator
wird verwendet, um jederForkJoin
Aufgabe Arbeit zuzuweisen. Ich denke, wir können die Arbeitseinheit hier als "geteilt" bezeichnen.Integer.sum()
, der vomIntStream.sum
Reduzierer verwendet wird. Sie werden sehen, dass die No-Limit-Version diese Funktion ständig aufruft, während die Limited-Version sie vor OOM nie aufruft.