Warum ist filter () nach flatMap () in Java-Streams "nicht vollständig" faul?

75

Ich habe den folgenden Beispielcode:

System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);
System.out.println("-----------");
System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);

Die Ausgabe ist wie folgt:

1
Result: 1
-----------
-1
0
1
0
1
2
1
2
3
Result: -1

Von hier aus sehe ich, dass sich der erste Fall streamwirklich träge verhält - wir verwenden ihn findFirst(), sobald wir das erste Element haben, wird unser Filter-Lambda nicht aufgerufen. Im zweiten Fall, in dem flatMaps verwendet wird, sehen wir jedoch, dass trotz des ersten Elements, das die Filterbedingung erfüllt (es ist nur jedes erste Element, da Lambda immer true zurückgibt), weitere Inhalte des Streams weiterhin durch die Filterfunktion eingespeist werden.

Ich versuche zu verstehen, warum es sich so verhält, anstatt aufzugeben, nachdem das erste Element wie im ersten Fall berechnet wurde. Alle hilfreichen Informationen wäre dankbar.

Vadym S. Khondar
quelle
11
@PhilippSander: Denn wenn es sich träge verhalten würde - wie im ersten Fall - würde es den Filter nur einmal auswerten.
Jon Skeet
4
Beachten Sie, dass Sie auch verwenden können peek: Stream.of(1, 2, 3).peek(System.out::println).filter(i -> true)...
Alexis C.
4
Beachten Sie, dass ich eine allgemeine Problemumgehung erstellt habe
Holger
9
An dem Tag, an dem diese Frage gestellt wurde, wurde ein OpenJDK-Fehler ausgelöst : bugs.openjdk.java.net/browse/JDK-8075939 . Es wurde zugewiesen, aber immer noch nicht behoben, fast ein Jahr später :(
MikeFHay
5
@MikeFHay JDK-8075939 ist für Java 10 vorgesehen. mail.openjdk.java.net/pipermail/core-libs-dev/2017-December/… für den Core-libs-dev-Überprüfungsthread und einen Link zum ersten Webrev.
Stefan Zobel

Antworten:

65

TL; DR, dies wurde in JDK-8075939 behoben und in Java 10 behoben (und in JDK-8225328 auf Java 8 zurückportiert ).

Wenn ReferencePipeline.javawir uns die Implementierung ( ) ansehen, sehen wir die Methode [ Link ]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

welches für den findFirstBetrieb aufgerufen wird. Das Besondere ist, dass Sie sink.cancellationRequested()die Schleife beim ersten Spiel beenden können. Vergleiche mit [ Link ]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // We can do better than this, by polling cancellationRequested when stream is infinite
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    try (Stream<? extends R> result = mapper.apply(u)) {
                        // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                        if (result != null)
                            result.sequential().forEach(downstream);
                    }
                }
            };
        }
    };
}

Die Methode zum Vorrücken eines Elements ruft forEachden Sub-Stream auf, ohne dass eine frühere Beendigung möglich ist, und der Kommentar am Anfang der flatMapMethode gibt sogar Auskunft über diese fehlende Funktion.

Da dies mehr als nur eine Optimierungssache ist, da dies impliziert, dass der Code einfach bricht, wenn der Substream unendlich ist, hoffe ich, dass die Entwickler bald beweisen, dass sie „besser als das können“…


Um die Auswirkungen zu veranschaulichen Stream.iterate(0, i->i+1).findFirst(), Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst()wird es , obwohl es wie erwartet funktioniert, in einer Endlosschleife enden.

In Bezug auf die Spezifikation finden Sie das meiste davon in der

Kapitel „Stream-Operationen und Pipelines“ der Paketspezifikation :

Zwischenoperationen geben einen neuen Stream zurück. Sie sind immer faul ;

… Durch Faulheit kann auch vermieden werden, dass alle Daten überprüft werden, wenn dies nicht erforderlich ist. Bei Operationen wie "Finden Sie die erste Zeichenfolge mit mehr als 1000 Zeichen" müssen Sie nur so viele Zeichenfolgen untersuchen, dass eine Zeichenfolge mit den gewünschten Eigenschaften gefunden wird, ohne alle von der Quelle verfügbaren Zeichenfolgen zu untersuchen. (Dieses Verhalten wird noch wichtiger, wenn der Eingabestream unendlich und nicht nur groß ist.)

Ferner werden einige Operationen als Kurzschlussoperationen angesehen . Eine Zwischenoperation ist kurzgeschlossen, wenn sie bei unendlicher Eingabe einen endlichen Strom erzeugen kann. Eine Terminaloperation ist kurzgeschlossen, wenn sie bei unendlicher Eingabe in endlicher Zeit beendet werden kann. Ein Kurzschlussbetrieb in der Pipeline ist eine notwendige, aber nicht ausreichende Bedingung, damit die Verarbeitung eines unendlichen Stroms normal in endlicher Zeit endet.

Es ist klar, dass eine Kurzschlussoperation keine endliche Zeitbeendigung garantiert, z. B. wenn ein Filter keinem Element entspricht, das die Verarbeitung nicht abschließen kann, sondern eine Implementierung, die keine Beendigung in endlicher Zeit durch einfaches Ignorieren unterstützt Der Kurzschlusscharakter einer Operation liegt weit außerhalb der Spezifikation.

Holger
quelle
27
Das ist ein Fehler. Es mag zwar zutreffen, dass die Spezifikation dieses Verhalten unterstützt, aber niemand erwartet, dass das Abrufen des ersten Elements eines unendlichen Streams einen StackOverflowError auslöst oder in einer Endlosschleife endet, unabhängig davon, ob es direkt von der Quelle der Pipeline oder stammt von einem verschachtelten Stream über eine Zuordnungsfunktion. Dies sollte als Fehler gemeldet werden.
fps
5
@Vadym S. Khondar: Das Einreichen eines Fehlerberichts ist eine gute Idee. In Bezug darauf, warum jemand dies vorher nicht bemerkt hat, habe ich schon viele Fehler gesehen, bei denen ich nicht glauben kann, dass ich der erste bin, der diese Fehler bemerkt. Sofern nicht unendlich viele Streams beteiligt sind, hat dieser Fehler nur Auswirkungen auf die Leistung, die in vielen Anwendungsfällen möglicherweise unbemerkt bleiben.
Holger
7
@ Marko Topolnik: Die Eigenschaft "beginnt erst, wenn der Terminalbetrieb der Pipeline ausgeführt wird" negiert keine anderen Eigenschaften von verzögerten Operationen. Ich weiß, dass es keine einsatzige Erklärung der besprochenen Eigenschaft gibt, sonst habe ich sie zitiert. Innerhalb der StreamAPI doc wird gesagt , dass „Streams sind faul; Die Berechnung der Quelldaten wird nur durchgeführt, wenn die Terminaloperation gestartet wird, und die Quellelemente werden nur nach Bedarf verwendet . “
Holger
6
Sie können noch einmal fragen, ob dies eine Garantie für eine verzögerte Ausführung in Bezug auf Kurzschlüsse impliziert. Ich sehe dies jedoch eher umgekehrt: Es wird zu keinem Zeitpunkt gesagt, dass Implementierungen frei sind, nicht faul zu handeln, wie wir es hier sehen. Und die Spezifikation ist sehr erschöpfend in Bezug darauf, was zulässig ist und was nicht.
Holger
5
JDK-8075939 macht jetzt Fortschritte. Unter mail.openjdk.java.net/pipermail/core-libs-dev/2017-December/… finden Sie den Überprüfungsthread für core-libs-dev und einen Link zum ersten Webrev. Es scheint, wir werden es in Java 10 sehen.
Stefan Zobel
17

Die Elemente des Eingabestreams werden nacheinander träge verbraucht. Das erste Element 1wird von den beiden flatMaps in den Stream umgewandelt -1, 0, 1, 0, 1, 2, 1, 2, 3, so dass der gesamte Stream nur dem ersten Eingabeelement entspricht. Die verschachtelten Ströme werden von der Pipeline eifrig materialisiert, dann abgeflacht und dann der filterBühne zugeführt . Dies erklärt Ihre Ausgabe.

Das Obige beruht nicht auf einer grundlegenden Einschränkung, aber es würde die Dinge wahrscheinlich viel komplizierter machen, wenn verschachtelte Streams vollständig faul sind. Ich vermute, es wäre eine noch größere Herausforderung, es performant zu machen.

Zum Vergleich erhalten Clojures faule Sequenzen für jede dieser Verschachtelungsebenen eine weitere Umhüllungsschicht. Aufgrund dieser Konstruktion können die Operationen sogar fehlschlagen, StackOverflowErrorwenn die Verschachtelung extrem ausgeführt wird.

Marko Topolnik
quelle
2
@ MarkoTopolnik, danke für deine Antwort. Eigentlich ist die Besorgnis von Holger Grund für meine Überraschung. Bedeutet der zweite Fall, dass ich flatMap nicht für unendliche Streams verwenden kann?
Vadym S. Khondar
Ja, ich wette, dass der verschachtelte Stream nicht unendlich sein kann.
Marko Topolnik
8

In Bezug auf das Brechen mit unendlichen Teilströmen wird das Verhalten von flatMap noch überraschender, wenn man einen Zwischenschluss (im Gegensatz zum Terminal) kurzschließt.

Während das Folgende wie erwartet funktioniert, drucken Sie die unendliche Folge von ganzen Zahlen aus

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).forEach(System.out::println);

Der folgende Code gibt nur die "1" aus, wird jedoch immer noch nicht beendet:

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).limit(1).forEach(System.out::println);

Ich kann mir keine Lektüre der Spezifikation vorstellen, in der das kein Fehler war.

Sebastian
quelle
6

In meiner kostenlosen StreamEx- Bibliothek habe ich die Kurzschlusssammler vorgestellt. Beim Sammeln eines sequentiellen Stroms mit einem kurzgeschlossenen Kollektor (wie MoreCollectors.first()) wird genau ein Element von der Quelle verbraucht. Intern ist es ziemlich schmutzig implementiert: Verwenden einer benutzerdefinierten Ausnahme, um den Kontrollfluss zu unterbrechen. Mit meiner Bibliothek könnte Ihr Beispiel folgendermaßen umgeschrieben werden:

System.out.println(
        "Result: " +
                StreamEx.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .collect(MoreCollectors.first())
                .get()
        );

Das Ergebnis ist folgendes:

-1
Result: -1
Tagir Valeev
quelle
0

Ich stimme anderen Leuten zu, dass dies ein Fehler ist, der bei JDK-8075939 geöffnet wurde . Und da es noch nicht mehr als ein Jahr später behoben ist. Ich möchte Ihnen empfehlen: AbacusUtil

N.println("Result: " + Stream.of(1, 2, 3).peek(N::println).first().get());

N.println("-----------");

N.println("Result: " + Stream.of(1, 2, 3)
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .peek(N::println).first().get());

// output:
// 1
// Result: 1
// -----------
// -1
// Result: -1

Offenlegung: Ich bin der Entwickler von AbacusUtil.

user_3380739
quelle
0

Heute bin ich auch auf diesen Fehler gestoßen. Das Verhalten ist nicht so direkt, da ein einfacher Fall wie unten gut funktioniert, aber ein ähnlicher Produktionscode nicht funktioniert.

 stream(spliterator).map(o -> o).flatMap(Stream::of).flatMap(Stream::of).findAny()

Für Leute, die nicht noch ein paar Jahre auf die Migration zu JDK-10 warten können, gibt es einen alternativen echten Lazy Stream. Parallel wird nicht unterstützt. Es war für die JavaScript-Übersetzung vorgesehen, hat aber für mich funktioniert, da die Benutzeroberfläche dieselbe ist.

StreamHelper basiert auf Sammlungen, aber es ist einfach, Spliterator anzupassen.

https://github.com/yaitskov/j4ts/blob/stream/src/main/java/javaemul/internal/stream/StreamHelper.java

Daneel Yaitskov
quelle