Parallele unendliche Java-Streams haben nicht genügend Speicher

16

Ich versuche zu verstehen, warum das folgende Java-Programm ein gibt OutOfMemoryError, während das entsprechende Programm ohne .parallel()dies nicht tut.

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

Ich habe zwei Fragen:

  1. Was ist die beabsichtigte Ausgabe dieses Programms?

    Ohne .parallel()es scheint, dass dies einfach sum(1+2+3+...)ausgegeben wird, was bedeutet, dass es einfach beim ersten Stream in der flatMap "hängen bleibt", was Sinn macht.

    Bei Parallel weiß ich nicht, ob es ein erwartetes Verhalten gibt, aber ich vermute, dass es irgendwie die ersten noder so Streams verschachtelt hat , wo nist die Anzahl der Parallelarbeiter. Es kann auch etwas anders sein, basierend auf dem Chunking / Puffering-Verhalten.

  2. Was führt dazu, dass der Speicher knapp wird? Ich versuche speziell zu verstehen, wie diese Streams unter der Haube implementiert werden.

    Ich vermute, dass etwas den Stream blockiert, so dass er nie beendet wird und die generierten Werte entfernen kann, aber ich weiß nicht genau, in welcher Reihenfolge die Dinge ausgewertet werden und wo Pufferung auftritt.

Bearbeiten: Falls es relevant ist, verwende ich Java 11.

Editt 2: Anscheinend passiert das Gleiche auch für das einfache Programm IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(), also hat es möglicherweise eher mit der Faulheit von limitals mit zu tun flatMap.

Thomas Ahle
quelle
parallel () verwendet intern ForkJoinPool. Ich denke, ForkJoin Framework ist in Java von Java 7
aravind

Antworten:

9

Sie sagen: " Aber ich weiß nicht genau, in welcher Reihenfolge die Dinge ausgewertet werden und wo die Pufferung stattfindet. " Genau darum geht es bei parallelen Streams. Die Reihenfolge der Bewertung ist nicht angegeben.

Ein kritischer Aspekt Ihres Beispiels ist das .limit(100_000_000). Dies bedeutet, dass die Implementierung nicht nur beliebige Werte summieren kann, sondern die ersten 100.000.000 Zahlen summieren muss . Beachten Sie, dass in der Referenzimplementierung .unordered().limit(100_000_000)das Ergebnis nicht geändert wird, was darauf hinweist, dass es keine spezielle Implementierung für den ungeordneten Fall gibt, dies jedoch ein Implementierungsdetail ist.

Wenn Worker-Threads die Elemente verarbeiten, können sie sie nicht einfach zusammenfassen, da sie wissen müssen, welche Elemente sie verwenden dürfen, was davon abhängt, wie viele Elemente ihrer spezifischen Arbeitslast vorausgehen. Da dieser Stream die Größen nicht kennt, kann dies nur bekannt sein, wenn die Präfixelemente verarbeitet wurden, was bei unendlichen Streams niemals der Fall ist. Damit die Worker-Threads für den Moment weiter puffern, werden diese Informationen verfügbar.

Wenn ein Worker-Thread weiß, dass er den Arbeitsblock ganz links verarbeitet, kann er die Elemente im Prinzip sofort zusammenfassen, zählen und das Ende signalisieren, wenn das Limit erreicht ist. Der Stream könnte also beendet werden, dies hängt jedoch von vielen Faktoren ab.

In Ihrem Fall ist ein plausibles Szenario, dass die anderen Worker-Threads Puffer schneller zuweisen, als der Job ganz links zählt. In diesem Szenario können geringfügige Änderungen am Timing dazu führen, dass der Stream gelegentlich mit einem Wert zurückkehrt.

Wenn wir alle Worker-Threads mit Ausnahme des Threads, der den Block ganz links verarbeitet, verlangsamen, können wir den Stream beenden (zumindest in den meisten Läufen):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ Ich folge einem Vorschlag von Stuart Marks , die Reihenfolge von links nach rechts zu verwenden, wenn über die Reihenfolge der Begegnung und nicht über die Verarbeitungsreihenfolge gesprochen wird.

Holger
quelle
Sehr schöne Antwort! Ich frage mich, ob überhaupt das Risiko besteht, dass alle Threads die flatMap-Operationen ausführen und keiner zugewiesen wird, um die Puffer tatsächlich zu leeren (Summieren). In meinem tatsächlichen Anwendungsfall sind die unendlichen Streams stattdessen Dateien, die zu groß sind, um im Speicher zu bleiben. Ich frage mich, wie ich den Stream neu schreiben kann, um die Speichernutzung gering zu halten.
Thomas Ahle
1
Sind Sie mit Files.lines(…)? Es wurde in Java 9 erheblich verbessert.
Holger
1
Dies ist in Java 8 der Fall. In neueren JREs wird unter BufferedReader.lines()bestimmten Umständen immer noch darauf zurückgegriffen (nicht auf das Standarddateisystem, einen speziellen Zeichensatz oder die Größe größer als Integer.MAX_FILES). Wenn eine dieser Bedingungen zutrifft, kann eine benutzerdefinierte Lösung hilfreich sein. Dies wäre eine neue Frage und
Holger
1
Integer.MAX_VALUEnatürlich…
Holger
1
Was ist der äußere Strom, ein Strom von Dateien? Hat es eine vorhersehbare Größe?
Holger
5

Meine beste Vermutung ist , dass das Hinzufügen parallel()ändert das interne Verhalten von flatMap()denen bereits hatten Probleme lazily vor der Auswertung .

Der OutOfMemoryErrorFehler, den Sie erhalten, wurde in [JDK-8202307] Abrufen eines java.lang.OutOfMemoryError: Java-Heapspeichers beim Aufrufen von Stream.iterator (). Next () in einem Stream gemeldet, der einen unendlichen / sehr großen Stream in flatMap verwendet . Wenn Sie sich das Ticket ansehen, ist es mehr oder weniger dieselbe Stapelverfolgung, die Sie erhalten. Das Ticket wurde aus folgendem Grund als Won't Fix geschlossen:

Die Methoden iterator()und spliterator()sind "Escape-Schraffuren", die verwendet werden, wenn andere Operationen nicht verwendet werden können. Sie haben einige Einschränkungen, da sie aus einem Push-Modell der Stream-Implementierung ein Pull-Modell machen. Ein solcher Übergang erfordert in bestimmten Fällen eine Pufferung, beispielsweise wenn ein Element (flach) auf zwei oder mehr Elemente abgebildet wird . Es würde die Stream-Implementierung erheblich erschweren, wahrscheinlich auf Kosten häufiger Fälle, einen Begriff des Gegendrucks zu unterstützen, um zu kommunizieren, wie viele Elemente durch verschachtelte Schichten der Elementproduktion gezogen werden sollen.

Karol Dowbecki
quelle
Das ist sehr interessant! Es ist sinnvoll, dass der Push / Pull-Übergang eine Pufferung erfordert, die den Speicher belegen kann. In meinem Fall scheint es jedoch gut zu funktionieren, wenn Sie nur Push verwenden und die verbleibenden Elemente einfach so verwerfen, wie sie erscheinen. Oder sagen Sie vielleicht, dass durch die Flapmap ein Iterator erstellt wird?
Thomas Ahle
3

OOME wird nicht dadurch verursacht , dass der Stream unendlich ist, sondern dadurch, dass dies nicht der Fall ist .

Das heißt, wenn Sie das auskommentieren .limit(...), wird ihm nie der Speicher ausgehen - aber natürlich wird es auch nie enden.

Sobald es geteilt ist, kann der Stream die Anzahl der Elemente nur verfolgen, wenn sie in jedem Thread akkumuliert sind (sieht aus wie der tatsächliche Akkumulator Spliterators$ArraySpliterator#array).

Sieht so aus, als könnten Sie es ohne reproduzieren. flatMapFühren Sie einfach Folgendes aus mit -Xmx128m:

    System.out.println(Stream
            .iterate(1, i -> i + 1)
            .parallel()
      //    .flatMap(n -> Stream.iterate(n, i -> i+n))
            .mapToInt(Integer::intValue)
            .limit(100_000_000)
            .sum()
    );

Nach dem Auskommentieren von limit()sollte es jedoch einwandfrei funktionieren, bis Sie sich entscheiden, Ihren Laptop zu schonen.

Neben den eigentlichen Implementierungsdetails passiert meiner Meinung nach Folgendes:

Mit möchte limitder sumReduzierer, dass die ersten X-Elemente summiert werden, sodass kein Thread Teilsummen ausgeben kann. Jedes "Slice" (Thread) muss Elemente akkumulieren und durchlaufen. Ohne Einschränkung gibt es keine solche Einschränkung, so dass jedes "Slice" nur die Teilsumme aus den Elementen berechnet, die es (für immer) erhält, vorausgesetzt, es gibt schließlich das Ergebnis aus.

Costi Ciudatu
quelle
Was meinst du mit "sobald es geteilt ist"? Teilt Limit es irgendwie auf?
Thomas Ahle
@ThomasAhle parallel()wird ForkJoinPoolintern verwendet, um Parallelität zu erreichen. Das Spliteratorwird verwendet, um jeder ForkJoinAufgabe Arbeit zuzuweisen. Ich denke, wir können die Arbeitseinheit hier als "geteilt" bezeichnen.
Karol Dowbecki
Aber warum passiert das nur mit Grenzen?
Thomas Ahle
@ThomasAhle Ich habe die Antwort mit meinen zwei Cent bearbeitet.
Costi Ciudatu
1
@ThomasAhle hat einen Haltepunkt festgelegt Integer.sum(), der vom IntStream.sumReduzierer verwendet wird. Sie werden sehen, dass die No-Limit-Version diese Funktion ständig aufruft, während die Limited-Version sie vor OOM nie aufruft.
Costi Ciudatu