Ich möchte einen Java 8-Stream duplizieren, damit ich zweimal damit umgehen kann. Ich kann collect
als Liste und neue Streams daraus bekommen;
// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff
Aber ich denke, es sollte einen effizienteren / eleganteren Weg geben.
Gibt es eine Möglichkeit, den Stream zu kopieren, ohne ihn in eine Sammlung umzuwandeln?
Ich arbeite tatsächlich mit einem Stream von Either
s, möchte also die linke Projektion auf eine Weise verarbeiten, bevor ich auf die rechte Projektion übergehe und auf eine andere Weise damit umgehe. Ein bisschen wie dieses (mit dem ich bisher gezwungen bin, den toList
Trick anzuwenden ).
List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());
Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );
Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
java
lambda
java-8
java-stream
Toby
quelle
quelle
Antworten:
Ich denke, Ihre Annahme über Effizienz ist etwas rückwärts. Sie erhalten diese enorme Amortisation, wenn Sie die Daten nur einmal verwenden, da Sie sie nicht speichern müssen. Streams bieten Ihnen leistungsstarke "Loop Fusion" -Optimierungen, mit denen Sie die gesamten Daten effizient durch die Pipeline fließen lassen können.
Wenn Sie dieselben Daten wiederverwenden möchten, müssen Sie sie per Definition entweder zweimal (deterministisch) generieren oder speichern. Wenn es sich bereits in einer Sammlung befindet, großartig; dann ist es billig, es zweimal zu wiederholen.
Wir haben im Design mit "gegabelten Streams" experimentiert. Was wir fanden, war, dass die Unterstützung echte Kosten hatte; es belastete den allgemeinen Fall (einmalige Verwendung) auf Kosten des ungewöhnlichen Falls. Das große Problem bestand darin, "was passiert, wenn die beiden Pipelines nicht mit der gleichen Rate Daten verbrauchen". Jetzt sind Sie sowieso wieder beim Puffern. Dies war ein Merkmal, das offensichtlich nicht das Gewicht hatte.
Wenn Sie dieselben Daten wiederholt bearbeiten möchten, speichern Sie sie entweder oder strukturieren Sie Ihre Vorgänge als Verbraucher und gehen Sie wie folgt vor:
Sie können sich auch die RxJava-Bibliothek ansehen, da sich das Verarbeitungsmodell besser für diese Art von "Stream Forking" eignet.
quelle
toList
) speichere , um sie verarbeiten zu können (derEither
Fall) als Beispiel)?Sie können eine lokale Variable mit a verwenden
Supplier
, um allgemeine Teile der Stream-Pipeline einzurichten.Von http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :
quelle
Supplier
wenn dasStream
mit einer "kostspieligen" Weise gebaut wird, zahlen Sie diese Kosten für jeden Anruf anSupplier.get()
. dh wenn eine Datenbankabfrage ... diese Abfrage jedes Mal durchgeführt wirdSet<Integer>
using konvertieren musstecollect(Collectors.toSet())
... und ein paar Operationen daran durchführen musste. Ich wolltemax()
und wenn ein bestimmter Wert als zwei Operationen festgelegt wurde ...filter(d -> d == -1).count() == 1;
Verwenden Sie a
Supplier
, um den Stream für jede Beendigungsoperation zu erstellen.Wenn Sie einen Stream dieser Sammlung benötigen
streamSupplier.get()
, können Sie einen neuen Stream abrufen.Beispiele:
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
quelle
Wir haben eine
duplicate()
Methode für Streams in jOOλ implementiert , einer Open Source-Bibliothek, die wir erstellt haben, um die Integrationstests für jOOQ zu verbessern . Im Wesentlichen können Sie einfach schreiben:Intern gibt es einen Puffer, in dem alle Werte gespeichert sind, die von einem Stream verbraucht wurden, aber nicht von dem anderen. Das ist wahrscheinlich so effizient wie es nur geht, wenn Ihre beiden Streams ungefähr gleich schnell verbraucht werden und wenn Sie mit dem Mangel an Thread-Sicherheit leben können .
So funktioniert der Algorithmus:
Mehr Quellcode hier
Tuple2
ist wahrscheinlich wie IhrPair
während Typ,Seq
istStream
mit einigen Verbesserungen.quelle
Tuple2<Seq<A>>, Seq<A>> t = duplicate(stream); long count = t.collect(counting()); List<A> list = t.collect(toList());
, ist es besserTuple2<Long, List<A>> t = stream.collect(Tuple.collectors(counting(), toList()));
. Die Verwendung vonCollectors.mapping/reducing
one kann andere Stream-Operationen als Kollektoren und Prozesselemente auf ganz andere Weise ausdrücken und ein einzelnes resultierendes Tupel erzeugen. Im Allgemeinen können Sie also viele Dinge tun, indem Sie den Stream einmal ohne Duplizierung verbrauchen, und er ist parallel.offer()
/poll()
API verwendet, aber esArrayDeque
könnte genauso sein.Sie können beispielsweise einen Stream mit ausführbaren Dateien erstellen:
Wo
failure
und wosuccess
sind die Operationen anzuwenden? Dies erstellt jedoch einige temporäre Objekte und ist möglicherweise nicht effizienter, als von einer Sammlung auszugehen und sie zweimal zu streamen / zu iterieren.quelle
Eine andere Möglichkeit, die Elemente mehrmals zu behandeln, ist die Verwendung von Stream.peek (Consumer) :
peek(Consumer)
kann so oft wie nötig verkettet werden.quelle
cyclops-react , eine Bibliothek, zu der ich beitrage, verfügt über eine statische Methode, mit der Sie einen Stream duplizieren können (und ein jOOλ-Tupel von Streams zurückgibt).
Siehe Kommentare, es gibt Leistungseinbußen, die auftreten, wenn Duplikate in einem vorhandenen Stream verwendet werden. Eine leistungsfähigere Alternative wäre die Verwendung von Streamable:
Es gibt auch eine (faule) Streamable-Klasse, die aus einem Stream, Iterable oder Array erstellt und mehrmals wiedergegeben werden kann.
AsStreamable.synchronizedFromStream (Stream) - kann verwendet werden, um ein Streamable zu erstellen, das die Backing-Sammlung träge auf eine Weise auffüllt, die für mehrere Threads freigegeben werden kann. Streamable.fromStream (Stream) verursacht keinen Synchronisationsaufwand.
quelle
List<Integer> list = stream.collect(Collectors.toList()); streams = new Tuple2<>(list.stream(), list.stream())
(wie OP vorschlägt). Bitte geben Sie in der Antwort auch ausdrücklich an, dass Sie der Autor von Cyclop-Streams sind. Lesen Sie dies .Für dieses spezielle Problem können Sie auch die Partitionierung verwenden. Etwas wie
quelle
Wir können Stream Builder zum Zeitpunkt des Lesens oder Iterierens eines Streams verwenden. Hier ist das Dokument von Stream Builder .
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html
Anwendungsfall
Angenommen, wir haben einen Mitarbeiter-Stream und müssen diesen Stream verwenden, um Mitarbeiterdaten in eine Excel-Datei zu schreiben und dann die Mitarbeitersammlung / -tabelle zu aktualisieren. [Dies ist nur ein Anwendungsfall, um die Verwendung von Stream Builder zu zeigen]:
quelle
Ich hatte ein ähnliches Problem und konnte mir drei verschiedene Zwischenstrukturen vorstellen, aus denen ich eine Kopie des Streams erstellen konnte: a
List
, ein Array und aStream.Builder
. Ich habe ein kleines Benchmark-Programm geschrieben, das darauf hinwies, dass es aus Sicht der LeistungList
etwa 30% langsamer war als die beiden anderen, die ziemlich ähnlich waren.Der einzige Nachteil beim Konvertieren in ein Array besteht darin, dass es schwierig ist, wenn Ihr Elementtyp ein generischer Typ ist (was in meinem Fall der Fall war). deshalb bevorzuge ich a
Stream.Builder
.Am Ende habe ich eine kleine Funktion geschrieben, die Folgendes erzeugt
Collector
:Ich kann dann eine Kopie jedes Streams erstellen,
str
indem ichstr.collect(copyCollector())
dies tue, was der idiomatischen Verwendung von Streams entspricht.quelle