Der effizienteste Weg, um das letzte Element eines Streams zu erhalten

76

Stream hat keine last()Methode:

Stream<T> stream;
T last = stream.last(); // No such method

Was ist der eleganteste und / oder effizienteste Weg, um das letzte Element zu erhalten (oder null für einen leeren Stream)?

Böhmisch
quelle
4
Wenn Sie das letzte Element von a finden müssen Stream, möchten Sie möglicherweise Ihr Design überdenken und wenn Sie wirklich a verwenden möchten Stream. Streams sind nicht unbedingt geordnet oder endlich. Wenn Sie Streamungeordnet, unendlich oder beides sind, hat das letzte Element keine Bedeutung. In meinen Augen geht es bei a Streamdarum, eine Abstraktionsebene zwischen Daten und deren Verarbeitung bereitzustellen. Als solches muss ein Streamselbst nichts über die relative Reihenfolge seiner Elemente wissen. Das letzte Element in a zu finden Streamist O (n). Wenn Sie eine andere Datenstruktur hätten, könnte dies O (1) sein.
Jeffrey
1
@jeff der Bedarf war real: Die Situation bestand darin, Artikel grob in einen Warenkorb zu legen. Jeder Zusatz gab Fehlerinformationen zurück (bestimmte Kombinationen von Artikeln waren nicht gültig), aber nur die Fehlerinformationen des letzten Zusatzes (wenn alle Artikel hinzugefügt worden waren und fair Einschätzung des Wagens möglich war) war die benötigte Information. (Ja, die von uns verwendete API ist defekt und kann nicht repariert werden.)
Böhmisch
14
@BrianGoetz: Unendliche Streams haben auch keine genau definierte count(), aber Stream hat immer noch eine count()Methode. In Wirklichkeit gilt dieses Argument für jede nicht kurzschließende Terminaloperation in unendlichen Strömen.
Jeffrey Bosboom
@BrianGoetz Ich denke, Streams sollten last()Methode haben. Am 1. April könnte es eine Umfrage geben, wie sie für unendliche Streams definiert werden sollte. Ich würde vorschlagen: "Es kehrt nie zurück und verwendet mindestens einen Prozessorkern zu 100%. Bei parallelen Streams müssen alle Kerne zu 100% verwendet werden."
Vojta
Wenn die Liste Objekte mit einer natürlichen Reihenfolge enthält oder die geordnet werden können, können Sie die max()Methode wie in verwenden stream()...max(Comparator...).
Erk

Antworten:

123

Führen Sie eine Reduzierung durch, die einfach den aktuellen Wert zurückgibt:

Stream<T> stream;
T last = stream.reduce((a, b) -> b).orElse(null);
Böhmisch
quelle
2
Würden Sie sagen, dass dies elegant, effizient oder beides war?
Duncan Jones
1
@Duncan Ich denke, es ist beides, aber ich bin noch keine Waffe in Java 8 und dieses Bedürfnis kam neulich bei der Arbeit auf - ein Junior schob den Stream auf einen Stapel und knallte ihn dann, und ich dachte, das sah besser aus, aber da könnte da draußen noch einfacher sein.
Böhmisch
19
Für Einfachheit und Eleganz gewinnt diese Antwort. Und es ist im allgemeinen Fall ziemlich effizient; es wird ziemlich gut parallelisieren. Für einige Stream-Quellen, die ihre Größe kennen, gibt es einen schnelleren Weg, aber in den meisten Fällen ist es den zusätzlichen Code nicht wert, diese wenigen Iterationen zu speichern.
Brian Goetz
1
@BrianGoetz wie wird das gut parallelisieren? Der letzte Wert wird mit einem parallelen Stream nicht vorhersehbar sein
benez
2
@BrianGoetz: Es ist immer noch O(n), auch wenn durch die Anzahl der CPU-Kerne geteilt. Da der Stream nicht weiß, was die Reduktionsfunktion tut, muss er sie dennoch für jedes Element auswerten.
Holger
37

Dies hängt stark von der Art der Stream. Denken Sie daran, dass „einfach“ nicht unbedingt „effizient“ bedeutet. Wenn Sie den Verdacht haben, dass der Stream sehr groß ist, schwere Operationen ausführt oder eine Quelle hat, die die Größe im Voraus kennt, ist Folgendes möglicherweise wesentlich effizienter als die einfache Lösung:

static <T> T getLast(Stream<T> stream) {
    Spliterator<T> sp=stream.spliterator();
    if(sp.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED)) {
        for(;;) {
            Spliterator<T> part=sp.trySplit();
            if(part==null) break;
            if(sp.getExactSizeIfKnown()==0) {
                sp=part;
                break;
            }
        }
    }
    T value=null;
    for(Iterator<T> it=recursive(sp); it.hasNext(); )
        value=it.next();
    return value;
}

private static <T> Iterator<T> recursive(Spliterator<T> sp) {
    Spliterator<T> prev=sp.trySplit();
    if(prev==null) return Spliterators.iterator(sp);
    Iterator<T> it=recursive(sp);
    if(it!=null && it.hasNext()) return it;
    return recursive(prev);
}

Sie können den Unterschied anhand des folgenden Beispiels veranschaulichen:

String s=getLast(
    IntStream.range(0, 10_000_000).mapToObj(i-> {
        System.out.println("potential heavy operation on "+i);
        return String.valueOf(i);
    }).parallel()
);
System.out.println(s);

Es wird gedruckt:

potential heavy operation on 9999999
9999999

Mit anderen Worten, die Operation wurde nicht für die ersten 9999999-Elemente ausgeführt, sondern nur für das letzte.

Holger
quelle
1
Was ist der Sinn des hasCharacteristics()Blocks? Welchen Wert fügt es hinzu, der nicht bereits von der recursive()Methode abgedeckt wird ? Letzterer navigiert bereits zum letzten Splitpunkt. Darüber hinaus recursive()kann nie zurückkehren, nullso dass Sie den it != nullScheck entfernen können.
Gili
1
Die rekursive Operation kann jeden Fall behandeln, ist jedoch nur ein Fallback, da es im schlimmsten Fall eine Rekursionstiefe gibt, die der Anzahl der (ungefilterten!) Elemente entspricht. Der Idealfall ist ein SUBSIZEDStream, der nicht leere geteilte Hälften garantieren kann, sodass wir nie mehr auf die linke Seite zurückkehren müssen. Beachten Sie, dass dies in diesem Fall recursivenicht der Fall ist, da trySplitsich bereits gezeigt hat, dass es zurückkehrt null.
Holger
2
Natürlich hätte der Code anders geschrieben werden können, und das war es auch. Ich denke, der nullCheck stammt aus einer früheren Version, aber dann habe ich festgestellt, dass man bei Nicht- SUBSIZEDStreams mit möglichen leeren Split-Teilen umgehen muss, dh man muss iterieren, um herauszufinden, ob er Werte hat. Deshalb habe ich den Spliterators.iterator(…)Aufruf in eine recursiveMethode verschoben in der Lage zu sein, auf die linke Seite zu sichern, wenn die rechte Seite leer ist. Die Schleife ist immer noch die bevorzugte Operation.
Holger
2
Interessante Lösung. Beachten Sie, dass gemäß der aktuellen Stream-API-Implementierung Ihr Stream entweder parallel oder direkt mit dem Quellenspliterator verbunden sein muss. Andernfalls wird die Aufteilung aus irgendeinem Grund verweigert, selbst wenn der zugrunde liegende Quellenspliterator aufgeteilt wird. Auf der anderen Seite können Sie nicht blind verwenden, parallel()da dies tatsächlich einige Operationen (wie das Sortieren) parallel ausführen kann, die unerwartet mehr CPU-Kerne verbrauchen.
Tagir Valeev
2
@Tagir Valeev: Richtig, der Beispielcode verwendet .parallel(), aber tatsächlich kann er Auswirkungen auf sorted()oder haben distinct(). Ich denke nicht, dass es einen Effekt für eine der anderen Zwischenoperationen geben sollte ...
Holger
6

Dies ist nur eine Überarbeitung von Holgers Antwort, da der Code zwar fantastisch, aber etwas schwer zu lesen / zu verstehen ist, insbesondere für Leute, die vor Java keine C-Programmierer waren. Hoffentlich ist meine überarbeitete Beispielklasse für diejenigen, die nicht mit Spliteratoren vertraut sind, was sie tun oder wie sie funktionieren, etwas einfacher zu befolgen.

public class LastElementFinderExample {
    public static void main(String[] args){
        String s = getLast(
            LongStream.range(0, 10_000_000_000L).mapToObj(i-> {
                System.out.println("potential heavy operation on "+i);
                return String.valueOf(i);
            }).parallel()
        );
        System.out.println(s);
    }

    public static <T> T getLast(Stream<T> stream){
        Spliterator<T> sp = stream.spliterator();
        if(isSized(sp)) {
            sp = getLastSplit(sp);
        }
        return getIteratorLastValue(getLastIterator(sp));
    }

    private static boolean isSized(Spliterator<?> sp){
        return sp.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED);
    }

    private static <T> Spliterator<T> getLastSplit(Spliterator<T> sp){
        return splitUntil(sp, s->s.getExactSizeIfKnown() == 0);
    }

    private static <T> Iterator<T> getLastIterator(Spliterator<T> sp) {
        return Spliterators.iterator(splitUntil(sp, null));
    }

    private static <T> T getIteratorLastValue(Iterator<T> it){
        T result = null;
        while (it.hasNext()){
            result = it.next();
        }
        return result;
    }

    private static <T> Spliterator<T> splitUntil(Spliterator<T> sp, Predicate<Spliterator<T>> condition){
        Spliterator<T> result = sp;
        for (Spliterator<T> part = sp.trySplit(); part != null; part = result.trySplit()){
            if (condition == null || condition.test(result)){
                result = part;
            }
        }
        return result;      
    }   
}
Steve K.
quelle
6

Guave hat Streams.findLast :

Stream<T> stream;
T last = Streams.findLast(stream);
Robert Važan
quelle
1
Und es reduce((a, b) -> b)Spliterator.trySplit
funktioniert
1

Hier ist eine andere Lösung (nicht so effizient):

List<String> list = Arrays.asList("abc","ab","cc");
long count = list.stream().count();
list.stream().skip(count-1).findFirst().ifPresent(System.out::println);
Panagdu
quelle
Interessant ... Haben Sie das getestet? Weil es keine substreamMethode gibt und selbst wenn dies der Fall wäre, würde dies nicht funktionieren, da countes sich um eine Terminaloperation handelt. Was ist die Geschichte dahinter?
Lii
Seltsam, ich weiß nicht, was für ein JDK ich habe, aber es hat einen Teilstrom. Ich habe mir das offizielle Javadoc angesehen ( docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html ) und Sie haben Recht, es erscheint hier nicht.
Panagdu
6
Natürlich müssen Sie count==0zuerst prüfen, ob als Eingabe Stream.skipnicht gefällt -1. Außerdem hat die Frage nicht gesagt, dass Sie das Streamzweimal erwerben können . Es wurde auch nicht gesagt, dass der Streamzweimalige Erwerb garantiert die gleiche Anzahl von Elementen erhält.
Holger
1

Parallele Streams ohne Größe mit 'Skip'-Methoden sind schwierig und die Implementierung von @ Holger gibt eine falsche Antwort. Auch die Implementierung von @ Holger ist etwas langsamer, da Iteratoren verwendet werden.

Eine Optimierung von @Holger Antwort:

public static <T> Optional<T> last(Stream<? extends T> stream) {
    Objects.requireNonNull(stream, "stream");

    Spliterator<? extends T> spliterator = stream.spliterator();
    Spliterator<? extends T> lastSpliterator = spliterator;

    // Note that this method does not work very well with:
    // unsized parallel streams when used with skip methods.
    // on that cases it will answer Optional.empty.

    // Find the last spliterator with estimate size
    // Meaningfull only on unsized parallel streams
    if(spliterator.estimateSize() == Long.MAX_VALUE) {
        for (Spliterator<? extends T> prev = spliterator.trySplit(); prev != null; prev = spliterator.trySplit()) {
            lastSpliterator = prev;
        }
    }

    // Find the last spliterator on sized streams
    // Meaningfull only on parallel streams (note that unsized was transformed in sized)
    for (Spliterator<? extends T> prev = lastSpliterator.trySplit(); prev != null; prev = lastSpliterator.trySplit()) {
        if (lastSpliterator.estimateSize() == 0) {
            lastSpliterator = prev;
            break;
        }
    }

    // Find the last element of the last spliterator
    // Parallel streams only performs operation on one element
    AtomicReference<T> last = new AtomicReference<>();
    lastSpliterator.forEachRemaining(last::set);

    return Optional.ofNullable(last.get());
}

Unit-Test mit Junit 5:

@Test
@DisplayName("last sequential sized")
void last_sequential_sized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed();
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(9_950_000L);
}

@Test
@DisplayName("last sequential unsized")
void last_sequential_unsized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(9_950_000L);
}

@Test
@DisplayName("last parallel sized")
void last_parallel_sized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(1);
}

@Test
@DisplayName("getLast parallel unsized")
void last_parallel_unsized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(1);
}

@Test
@DisplayName("last parallel unsized with skip")
void last_parallel_unsized_with_skip() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    // Unfortunately unsized parallel streams does not work very well with skip
    //assertThat(Streams.last(stream)).hasValue(expected);
    //assertThat(count).hasValue(1);

    // @Holger implementation gives wrong answer!!
    //assertThat(Streams.getLast(stream)).hasValue(9_950_000L); //!!!
    //assertThat(count).hasValue(1);

    // This is also not a very good answer better
    assertThat(Streams.last(stream)).isEmpty();
    assertThat(count).hasValue(0);
}

Die einzige Lösung, die beide Szenarien unterstützt, besteht darin, zu vermeiden, dass der letzte Spliterator in nicht dimensionierten parallelen Streams erkannt wird. Die Folge ist, dass die Lösung Operationen an allen Elementen ausführt, aber immer die richtige Antwort gibt.

Beachten Sie, dass in sequentiellen Streams ohnehin Operationen an allen Elementen ausgeführt werden.

public static <T> Optional<T> last(Stream<? extends T> stream) {
    Objects.requireNonNull(stream, "stream");

    Spliterator<? extends T> spliterator = stream.spliterator();

    // Find the last spliterator with estimate size (sized parallel streams)
    if(spliterator.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED)) {
        // Find the last spliterator on sized streams (parallel streams)
        for (Spliterator<? extends T> prev = spliterator.trySplit(); prev != null; prev = spliterator.trySplit()) {
            if (spliterator.getExactSizeIfKnown() == 0) {
                spliterator = prev;
                break;
            }
        }
    }

    // Find the last element of the spliterator
    //AtomicReference<T> last = new AtomicReference<>();
    //spliterator.forEachRemaining(last::set);

    //return Optional.ofNullable(last.get());

    // A better one that supports native parallel streams
    return (Optional<T>) StreamSupport.stream(spliterator, stream.isParallel())
            .reduce((a, b) -> b);
}

In Bezug auf die Komponententests für diese Implementierung sind die ersten drei Tests genau gleich (sequentiell und parallel parallel). Die Tests für nicht dimensionierte Parallelen sind hier:

@Test
@DisplayName("last parallel unsized")
void last_parallel_unsized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(10_000_000L);
}

@Test
@DisplayName("last parallel unsized with skip")
void last_parallel_unsized_with_skip() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(9_950_000L);
}
Tet
quelle
Beachten Sie, dass Unit-Tests die assertj-Bibliothek verwenden, um die Sprachkompetenz zu verbessern.
Tet
2
Das Problem ist, dass Sie StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel())einen IterableUmweg machen, der überhaupt keine Eigenschaften aufweist, mit anderen Worten, einen ungeordneten Strom erzeugt. Das Ergebnis hat also nichts mit Parallelität oder Verwendung zu tun skip, sondern nur mit der Tatsache, dass „last“ für einen ungeordneten Stream keine Bedeutung hat, sodass jedes Element ein gültiges Ergebnis ist.
Holger
1

Wir brauchten lasteinen Stream in der Produktion - ich bin mir immer noch nicht sicher, ob wir das wirklich getan haben, aber verschiedene Teammitglieder in meinem Team sagten, wir hätten es aus verschiedenen "Gründen" getan. Am Ende habe ich so etwas geschrieben:

 private static class Holder<T> implements Consumer<T> {

    T t = null;
    // needed to null elements that could be valid
    boolean set = false;

    @Override
    public void accept(T t) {
        this.t = t;
        set = true;
    }
}

/**
 * when a Stream is SUBSIZED, it means that all children (direct or not) are also SIZED and SUBSIZED;
 * meaning we know their size "always" no matter how many splits are there from the initial one.
 * <p>
 * when a Stream is SIZED, it means that we know it's current size, but nothing about it's "children",
 * a Set for example.
 */
private static <T> Optional<Optional<T>> last(Stream<T> stream) {

    Spliterator<T> suffix = stream.spliterator();
    // nothing left to do here
    if (suffix.getExactSizeIfKnown() == 0) {
        return Optional.empty();
    }

    return Optional.of(Optional.ofNullable(compute(suffix, new Holder())));
}


private static <T> T compute(Spliterator<T> sp, Holder holder) {

    Spliterator<T> s;
    while (true) {
        Spliterator<T> prefix = sp.trySplit();
        // we can't split any further
        // BUT don't look at: prefix.getExactSizeIfKnown() == 0 because this
        // does not mean that suffix can't be split even more further down
        if (prefix == null) {
            s = sp;
            break;
        }

        // if prefix is known to have no elements, just drop it and continue with suffix
        if (prefix.getExactSizeIfKnown() == 0) {
            continue;
        }

        // if suffix has no elements, try to split prefix further
        if (sp.getExactSizeIfKnown() == 0) {
            sp = prefix;
        }

        // after a split, a stream that is not SUBSIZED can give birth to a spliterator that is
        if (sp.hasCharacteristics(Spliterator.SUBSIZED)) {
            return compute(sp, holder);
        } else {
            // if we don't know the known size of suffix or prefix, just try walk them individually
            // starting from suffix and see if we find our "last" there
            T suffixResult = compute(sp, holder);
            if (!holder.set) {
                return compute(prefix, holder);
            }
            return suffixResult;
        }


    }

    s.forEachRemaining(holder::accept);
    // we control this, so that Holder::t is only T
    return (T) holder.t;

}

Und einige Verwendungen davon:

    Stream<Integer> st = Stream.concat(Stream.of(1, 2), Stream.empty());
    System.out.println(2 == last(st).get().get());

    st = Stream.concat(Stream.empty(), Stream.of(1, 2));
    System.out.println(2 == last(st).get().get());

    st = Stream.concat(Stream.iterate(0, i -> i + 1), Stream.of(1, 2, 3));
    System.out.println(3 == last(st).get().get());

    st = Stream.concat(Stream.iterate(0, i -> i + 1).limit(0), Stream.iterate(5, i -> i + 1).limit(3));
    System.out.println(7 == last(st).get().get());

    st = Stream.concat(Stream.iterate(5, i -> i + 1).limit(3), Stream.iterate(0, i -> i + 1).limit(0));
    System.out.println(7 == last(st).get().get());

    String s = last(
        IntStream.range(0, 10_000_000).mapToObj(i -> {
            System.out.println("potential heavy operation on " + i);
            return String.valueOf(i);
        }).parallel()
    ).get().get();

    System.out.println(s.equalsIgnoreCase("9999999"));

    st = Stream.empty();
    System.out.println(last(st).isEmpty());

    st = Stream.of(1, 2, 3, 4, null);
    System.out.println(last(st).get().isEmpty());

    st = Stream.of((Integer) null);
    System.out.println(last(st).isPresent());

    IntStream is = IntStream.range(0, 4).filter(i -> i != 3);
    System.out.println(last(is.boxed()));

Erstens ist der Rückgabetyp Optional<Optional<T>>- es sieht komisch aus , da stimme ich zu. Wenn das erste Optionalleer ist, bedeutet dies, dass der Stream keine Elemente enthält. Wenn das zweite Optionale leer ist, bedeutet dies, dass das letzte Element tatsächlich war null, dh: Stream.of(1, 2, 3, null)(im Gegensatz zu dem guava, Streams::findLastdas in einem solchen Fall eine Ausnahme auslöst).

Ich gebe zu, ich habe mich hauptsächlich von Holgers Antwort auf eine ähnliche Frage wie meine und die von Guaven inspirieren lassen Streams::findLast.

Eugene
quelle