takeWhile () arbeitet anders mit flatmap

75

Ich erstelle mit takeWhile Snippets, um die Möglichkeiten zu erkunden. In Verbindung mit flatMap entspricht das Verhalten nicht den Erwartungen. Das Code-Snippet finden Sie unten.

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

Tatsächliche Ausgabe:

Sample1
Sample2
Sample3
Sample5

Erwartete Ausgabe:

Sample1
Sample2
Sample3

Grund für die Erwartung ist, dass takeWhile ausgeführt werden sollte, bis die Bedingung im Inneren wahr wird. Ich habe auch Ausdruckanweisungen in Flatmap zum Debuggen hinzugefügt. Die Streams werden nur zweimal zurückgegeben, was der Erwartung entspricht.

Dies funktioniert jedoch ohne Flatmap in der Kette einwandfrei.

String[] strArraySingle = {"Sample3", "Sample4", "Sample5"};
Arrays.stream(strArraySingle)
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

Tatsächliche Ausgabe:

Sample3

Hier stimmt die tatsächliche Ausgabe mit der erwarteten Ausgabe überein.

Haftungsausschluss: Diese Snippets dienen nur zum Üben von Code und dienen keinen gültigen Verwendungszwecken.

Update: Bug JDK-8193856 : Fix wird als Teil des JDK 10. verfügbar sein Die Änderung wird zu korrigieren whileOps Sink :: akzeptieren

@Override 
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

Geänderte Implementierung:

@Override
public void accept(T t) {
    if (take && (take = predicate.test(t))) {
        downstream.accept(t);
    }
}
Jeevan Varughese
quelle

Antworten:

54

Dies ist ein Fehler in JDK 9 - ab Problem # 8193856 :

takeWhilegeht fälschlicherweise davon aus, dass eine vorgelagerte Operation die Stornierung unterstützt und honoriert, was leider nicht der Fall ist flatMap.

Erläuterung

Wenn der Stream bestellt wird, takeWhilesollte das erwartete Verhalten angezeigt werden. Dies ist in Ihrem Code nicht ganz der Fall, da Sie verwenden forEach, wodurch auf die Bestellung verzichtet wird. Wenn Sie sich dafür interessieren, was Sie in diesem Beispiel tun, sollten Sie forEachOrderedstattdessen verwenden. Lustige Sache: Das ändert nichts. 🤔

Vielleicht ist der Stream also gar nicht erst bestellt? (In diesem Fall ist das Verhalten in Ordnung .) Wenn Sie eine temporäre Variable für den Stream erstellen, aus dem erstellt wurde, strArrayund überprüfen, ob sie geordnet ist, indem Sie den Ausdruck ((StatefulOp) stream).isOrdered();am Haltepunkt ausführen , werden Sie feststellen, dass er tatsächlich geordnet ist:

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Stream<String> stream = Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));

// breakpoint here
System.out.println(stream);

Dies bedeutet, dass dies sehr wahrscheinlich ein Implementierungsfehler ist.

In den Code

Wie andere vermutet haben, denke ich jetzt auch, dass dies mit Eifer verbunden sein könnteflatMap . Genauer gesagt können beide Probleme dieselbe Grundursache haben.

Wenn WhileOpswir uns die Quelle von ansehen, können wir folgende Methoden erkennen:

@Override
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

@Override
public boolean cancellationRequested() {
    return !take || downstream.cancellationRequested();
}

Dieser Code wird verwendet takeWhile, um für ein bestimmtes Stream-Element zu überprüfen, tob das predicateerfüllt ist:

  • downstreamIn diesem Fall wird das Element in diesem Fall an die Operation weitergeleitet System.out::println.
  • Wenn nicht, wird der Wert takeauf false gesetzt. Wenn Sie das nächste Mal gefragt werden, ob die Pipeline abgebrochen werden soll (dh, dies ist erledigt), wird sie zurückgegeben true.

Dies umfasst den takeWhileBetrieb. Das andere, was Sie wissen müssen, ist, dass forEachOrdereddie Terminaloperation die Methode ausführt ReferencePipeline::forEachWithCancel:

@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    boolean cancelled;
    do { } while (
            !(cancelled = sink.cancellationRequested())
            && spliterator.tryAdvance(sink));
    return cancelled;
}

Alles was dies tut ist:

  1. Überprüfen Sie, ob die Pipeline abgebrochen wurde
  2. Wenn nicht, stellen Sie die Spüle um ein Element vor
  3. Hör auf, wenn dies das letzte Element war

Sieht vielversprechend aus, oder?

Ohne flatMap

Im "guten Fall" (ohne flatMap; Ihr zweites Beispiel) wird forEachWithCanceldirekt auf das WhileOpAs gearbeitet sinkund Sie können sehen, wie sich dies auswirkt:

  • ReferencePipeline::forEachWithCancel macht seine Schleife:
    • WhileOps::accept wird jedes Stream-Element gegeben
    • WhileOps::cancellationRequested wird nach jedem Element abgefragt
  • Irgendwann "Sample4"schlägt das Prädikat fehl und der Stream wird abgebrochen

Yay!

Mit flatMap

Im "schlechten Fall" (mit flatMap; Ihrem ersten Beispiel) wird jedoch forEachWithCanceldie flatMapOperation ausgeführt, die einfach forEachRemainingdas ArraySpliteratorfor aufruft {"Sample3", "Sample4", "Sample5"}, was Folgendes bewirkt:

if ((a = array).length >= (hi = fence) &&
    (i = index) >= 0 && i < (index = hi)) {
    do { action.accept((T)a[i]); } while (++i < hi);
}

Wenn Sie all das hiund fencealles ignorieren , was nur verwendet wird, wenn die Array-Verarbeitung für einen parallelen Stream aufgeteilt wird, ist dies eine einfache forSchleife, die jedes Element an die takeWhileOperation übergibt , aber niemals prüft, ob es abgebrochen wird . Es wird daher eifrig alle Elemente in diesem "Teilstrom" durchlaufen, bevor es stoppt, wahrscheinlich sogar durch den Rest des Stroms .

Nicolai Parlog
quelle
17
@ Eugene: Nun, ich wette, es ist mit diesem verbunden . Es hat zufällig für Terminal-Kurzschlussoperationen funktioniert, weil sie überschüssige Elemente ignorieren, aber jetzt haben wir Zwischenkurzschlussoperationen… Es ist also eine gute Nachricht, da dies impliziert, dass jetzt etwas mehr Druck besteht, diesen Fehler zu beheben (miese Leistung oder zu brechen, wenn die Sub-Streams unendlich sind, war anscheinend nicht genug)…
Holger
10
Es wird nicht der gesamte Stream durchlaufen. Wenn das letzte Element eines Sub-Streams mit dem Prädikat übereinstimmt, funktioniert die Abbruchunterstützung des äußeren Streams, z. B. String[][] strArray = { {"Sample1", "Sample2"}, {"Sample3", "Sample4"}, {"Sample5", "Sample6"}, };als Eingabe, und es scheint zu funktionieren. Wenn nur ein Zwischenelement übereinstimmt, führt flatMapdie Unkenntnis zur Löschung dazu, dass das Flag bei der Auswertung des nachfolgenden Elements überschrieben wird.
Holger
@Holger Ich meinte nur "Teilstrom" (was aus meiner Formulierung nicht klar hervorgeht) und dachte nicht einmal daran, "Teilstrom" zu folgen. Der Wortlaut wurde geändert und aus Klarstellung mit Ihrem Kommentar verknüpft.
Nicolai Parlog
16
Scheint, sie haben dich gehört: bugs.openjdk.java.net/browse/JDK-8193856
Stefan Zobel
20

Dies ist ein Fehler, egal wie ich ihn betrachte - und danke Holger für Ihre Kommentare. Ich wollte diese Antwort hier nicht einfügen (ernsthaft!), Aber keine der Antworten besagt eindeutig, dass dies ein Fehler ist.

Die Leute sagen, dass dies mit bestellt / unbestellt sein muss, und dies ist nicht wahr, da dies truedreimal gemeldet wird:

Stream<String[]> s1 = Arrays.stream(strArray);
System.out.println(s1.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s2 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream));
System.out.println(s2.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s3 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream))
            .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
System.out.println(s3.spliterator().hasCharacteristics(Spliterator.ORDERED));

Es ist auch sehr interessant, wenn Sie es ändern zu:

String[][] strArray = { 
         { "Sample1", "Sample2" }, 
         { "Sample3", "Sample5", "Sample4" }, // Sample4 is the last one here
         { "Sample7", "Sample8" } 
};

dann Sample7und Sample8wird nicht Teil der Ausgabe sein, sonst werden sie. Es scheint, dass ein Abbruch-Flag flatmap ignoriert wird , das von eingeführt würde dropWhile.

Eugene
quelle
11

Wenn Sie sich die DokumentationtakeWhile ansehen für :

Wenn dieser Stream geordnet ist, [gibt] einen Stream zurück, der aus dem längsten Präfix von Elementen besteht, die aus diesem Stream entnommen wurden und mit dem angegebenen Prädikat übereinstimmen.

Wenn dieser Stream ungeordnet ist, gibt [einen] Stream zurück, der aus einer Teilmenge von Elementen besteht, die aus diesem Stream stammen und mit dem angegebenen Prädikat übereinstimmen.

Ihr Stream ist zufällig bestellt, takeWhile weiß aber nicht, dass dies der Fall ist. Als solches gibt es die 2. Bedingung zurück - die Teilmenge. Du takeWhilebenimmst dich nur wie ein filter.

Wenn Sie einen Anruf hinzuzufügen , sortedvor takeWhile, werden Sie sehen das Ergebnis Sie erwarten:

Arrays.stream(strArray)
      .flatMap(indStream -> Arrays.stream(indStream))
      .sorted()
      .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
      .forEach(ele -> System.out.println(ele));
Michael
quelle
17
Warum wird es nicht bestellt oder warum weiß es nicht, dass es ist? Die "Verkettung" geordneter Streams sollte geordnet werden, nicht wahr?
JB Nizet
9
@JBNizet aber wenn Sie dann jeden einzelnen Schritt Stream<String[]> s1 = Arrays.stream(strArray); System.out.println(s1.spliterator().hasCharacteristics(Split‌​erator.ORDERED))und so weiter für jeden Schritt machen - sie werden alle einen ORDEREDStream produzieren, sieht dies aus wie ein Fehler, der noch nicht gemeldet wurde
Eugene
8
@ Michael wie ich es sehe (laut vorherigem Kommentar) - Ihre Schlussfolgerung ist falsch für mich
Eugene
10
Aber Takewhile nicht weiß , dass es “ ... na ja , warum ist es nicht wissen , wann der Strom und sein Unterstrom werden bestellt und warum ist .sorted().unordered() .takeWhile(…)immer noch das Richtige dann tun? Ich würde sagen, es liegt daran, dass es sich sortedum eine zustandsbehaftete Operation handelt, die die gesamte Eingabe puffert, gefolgt von einer wirklich faulen Iteration.
Holger
2
"Ihr Stream ist zufällig bestellt, aber takeWhile weiß nicht, dass dies der Fall ist. Als solches gibt es die zweite Bedingung zurück - die Teilmenge. Ihr takeWhile verhält sich nur wie ein Filter.": Aber das klingt wirklich falsch. Wenn der Stream nicht geordnet ist, gibt er seine Elemente in einer unvorhersehbaren Reihenfolge zurück. Nun takeWhilesollten Sie auf die Elemente, die es tatsächlich empfängt, in der Reihenfolge reagieren, in der es sie empfängt, und aufhören, sobald ein Element sein Prädikat nicht erfüllt. Wenn man nach einem ungeordneten Stream filtern möchte, sollte man ihn verwenden filter.
Giorgio
9

Der Grund dafür ist, dass die flatMapOperation auch eine Zwischenoperation ist, bei der (eine von) der zustandsbehafteten Kurzschluss-Zwischenoperation takeWhile verwendet wird.

Das Verhalten flatMapvon Holger in dieser Antwort ist sicherlich eine Referenz, die man sich nicht entgehen lassen sollte, um die unerwartete Ausgabe für solche Kurzschlussoperationen zu verstehen.

Ihr erwartetes Ergebnis kann erzielt werden, indem Sie diese beiden Zwischenoperationen aufteilen, indem Sie eine Terminaloperation einführen, um einen geordneten Stream deterministisch weiter zu verwenden, und sie für eine Stichprobe ausführen als:

List<String> sampleList = Arrays.stream(strArray).flatMap(Arrays::stream).collect(Collectors.toList());
sampleList.stream().takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
            .forEach(System.out::println);

Außerdem scheint es einen verwandten Fehler # JDK-8075939 zu geben, um dieses bereits registrierte Verhalten zu verfolgen.

Bearbeiten : Dies kann weiter verfolgt werden, indem JDK-8193856 als Fehler akzeptiert wird.

Naman
quelle
8
Ich verstehe deine Erklärung nicht. Für mich scheint dieses Verhalten ein Fehler zu sein. Für Ihre vorgeschlagene Alternative sind zwei Stream-Pipelines erforderlich, die möglicherweise weniger wünschenswert sind.
Eran
2
@Eran In der Tat scheint das Verhalten wie ein Fehler. Die vorgeschlagene Alternative besteht darin, lediglich eine Terminaloperation einzuführen, um den (Abgas-) flatMapBetrieb abzuschließen, und dann den Stream zur Ausführung zu verarbeiten takeWhile.
Naman