Unterschied zwischen Java 8-Streams und RxJava-Observablen

144

Sind Java 8-Streams RxJava-Observablen ähnlich?

Java 8-Stream-Definition:

Klassen im neuen java.util.streamPaket bieten eine Stream-API zur Unterstützung von Operationen im Funktionsstil für Streams von Elementen.

rahulrv
quelle
8
Zu Ihrer Information gibt es Vorschläge, mehr RxJava-ähnliche Klassen in JDK 9 einzuführen
John Vint
@ JohnVint Wie ist der Status dieses Vorschlags? Wird es tatsächlich fliegen?
Igor Ganapolsky
2
@IgorGanapolsky Oh ja, es sieht definitiv so aus, als würde es in jdk9 kommen. cr.openjdk.java.net/~martin/webrevs/openjdk9/… . Es gibt sogar einen Port für RxJava zum Flow github.com/akarnokd/RxJavaUtilConcurrentFlow .
John Vint
Ich weiß, dass dies eine sehr alte Frage ist, aber ich habe kürzlich an diesem großartigen Vortrag von Venkat Subramaniam teilgenommen, der eine aufschlussreiche Sicht auf das Thema hat und auf Java9 aktualisiert wurde: youtube.com/watch?v=kfSSKM9y_0E . Könnte für Leute interessant sein, die sich mit RxJava beschäftigen.
Pedro

Antworten:

152

TL; DR : Alle Sequenz- / Stream-Verarbeitungsbibliotheken bieten eine sehr ähnliche API für den Pipeline-Aufbau. Die Unterschiede liegen in der API für die Handhabung von Multithreading und die Zusammensetzung von Pipelines.

RxJava unterscheidet sich stark von Stream. Von allen JDK-Dingen ist rava.Observable möglicherweise die Kombination java.util.stream.Collector Stream + CompletableFuture am nächsten (was mit dem Umgang mit einer zusätzlichen Monadenebene verbunden ist, dh die Konvertierung zwischen Stream<CompletableFuture<T>>und handhaben muss CompletableFuture<Stream<T>>).

Es gibt signifikante Unterschiede zwischen Observable und Stream:

  • Streams sind Pull-basiert, Observables sind Push-basiert. Das mag zu abstrakt klingen, hat aber erhebliche Konsequenzen, die sehr konkret sind.
  • Stream kann nur einmal verwendet werden, Observable kann mehrmals abonniert werden
  • Stream#parallel()teilt die Sequenz in Partitionen auf Observable#subscribeOn()und Observable#observeOn()tut dies nicht; Es ist schwierig, das Stream#parallel()Verhalten mit Observable zu emulieren . Es hatte einmal eine .parallel()Methode, aber diese Methode verursachte so viel Verwirrung, dass die .parallel()Unterstützung in ein separates Repository auf Github, RxJavaParallel, verschoben wurde. Weitere Details finden Sie in einer anderen Antwort .
  • Stream#parallel()Im Gegensatz zu den meisten RxJava-Methoden, die optionalen Scheduler akzeptieren, kann kein zu verwendender Thread-Pool angegeben werden. Da alle Stream-Instanzen in einer JVM denselben Fork-Join-Pool verwenden, .parallel()kann das Hinzufügen versehentlich das Verhalten in einem anderen Modul Ihres Programms beeinflussen
  • Ströme fehlen zeitbezogene Operationen wie Observable#interval(), Observable#window()und viele andere; Dies liegt hauptsächlich daran, dass Streams Pull-basiert sind und Upstream keine Kontrolle darüber hat, wann das nächste Element Downstream ausgegeben werden soll
  • Streams bieten im Vergleich zu RxJava eingeschränkte Operationen. Zum Beispiel fehlen Streams Abschaltoperationen ( takeWhile(), takeUntil()); Die Problemumgehung Stream#anyMatch()ist begrenzt: Es handelt sich um einen Terminalbetrieb, sodass Sie ihn nur einmal pro Stream verwenden können
  • Ab JDK 8 gibt es keine Stream # -Zip-Operation, was manchmal sehr nützlich ist
  • Streams sind schwer selbst zu erstellen. Observable kann auf viele Arten erstellt werden. BEARBEITEN: Wie in den Kommentaren erwähnt, gibt es Möglichkeiten, . Da es jedoch keinen nicht-terminalen Kurzschluss gibt, können Sie z. B. nicht einfach einen Zeilenstrom in einer Datei generieren (JDK bietet jedoch sofort die Zeilen "Files #" und "BufferedReader #" an, und andere ähnliche Szenarien können durch Erstellen eines Streams verwaltet werden von Iterator).
  • Observable bietet Ressourcenverwaltungsfunktion ( Observable#using()); Sie können IO-Stream oder Mutex damit umschließen und sicherstellen, dass der Benutzer nicht vergisst, die Ressource freizugeben. Sie wird bei Beendigung des Abonnements automatisch entsorgt. Stream hat eine onClose(Runnable)Methode, aber Sie müssen sie manuell oder über Try-with-Resources aufrufen. Z.B. Sie müssen bedenken, dass Files # lines () muss in Try-mit-Ressourcen - Block eingeschlossen werden.
  • Observables werden vollständig synchronisiert (ich habe nicht überprüft, ob dies auch für Streams gilt). Dies erspart Ihnen die Überlegung, ob grundlegende Vorgänge threadsicher sind (die Antwort lautet immer "Ja", es sei denn, es liegt ein Fehler vor), aber der Aufwand für die Parallelität ist vorhanden, unabhängig davon, ob Ihr Code ihn benötigt oder nicht.

Zusammenfassung: RxJava unterscheidet sich erheblich von Streams. Echte RxJava-Alternativen sind andere Implementierungen von ReactiveStreams , z. B. ein relevanter Teil von Akka.

Update . Es gibt einen Trick, für den ein nicht standardmäßiger Fork-Join-Pool verwendet werden kann Stream#parallel, siehe Benutzerdefinierter Thread-Pool im parallelen Java 8-Stream

Update . All dies basiert auf den Erfahrungen mit RxJava 1.x. Jetzt, da RxJava 2.x hier ist , ist diese Antwort möglicherweise veraltet.

Kirill Gamazkov
quelle
2
Warum sind Streams schwer zu konstruieren? Laut diesem Artikel scheint es einfach: oracle.com/technetwork/articles/java/…
IgorGanapolsky
2
Es gibt eine ganze Reihe von Klassen mit der Stream-Methode: Sammlungen, Eingabestreams, Verzeichnisdateien usw. Aber was ist, wenn Sie einen Stream aus einer benutzerdefinierten Schleife erstellen möchten - beispielsweise über den Datenbankcursor iterieren? Der beste Weg, den ich bisher gefunden habe, besteht darin, einen Iterator zu erstellen, ihn mit Spliterator zu verpacken und schließlich StreamSupport # fromSpliterator aufzurufen. Zu viel Kleber für einen einfachen Fall IMHO. Es gibt auch Stream.iterate, aber es erzeugt einen unendlichen Stream. Die einzige Möglichkeit, Sream in diesem Fall auszuschalten, ist Stream # anyMatch, aber es ist eine Terminaloperation, daher können Sie Stream-Produzent und
-Konsument
2
RxJava hat Observable.fromCallable, Observable.create und so weiter. Oder Sie können sicher unendlich Observable produzieren und dann '.takeWhile (Bedingung)' sagen, und Sie können diese Sequenz an die Verbraucher
versenden
1
Streams sind nicht schwer selbst zu erstellen. Sie können einfach Stream.generate()Ihre eigene Supplier<U>Implementierung aufrufen und übergeben , nur eine einfache Methode, mit der Sie das nächste Element im Stream bereitstellen. Es gibt viele andere Methoden. Um eine Sequenz zu erstellen Stream, die von vorherigen Werten abhängt, können Sie die interate()Methode verwenden. Jede Collectionhat eine stream()Methode und erstellt Stream.of()eine Streamaus einem varargs oder Array. StreamSupportUnterstützt schließlich die erweiterte Stream-Erstellung mithilfe von Spliteratoren oder primitiven Stream-Typen.
jbx
"Streams fehlen Abschaltoperationen ( takeWhile(), takeUntil());" - JDK9 hat diese, glaube ich, in takeWhile () und dropWhile ()
Abdul
50

Java 8 Stream und RxJava sehen ziemlich ähnlich aus. Sie haben ähnliche Operatoren (Filter, Map, FlatMap ...), sind jedoch nicht für die gleiche Verwendung ausgelegt.

Sie können Asynchonus-Aufgaben mit RxJava ausführen.

Mit dem Java 8-Stream durchlaufen Sie Elemente Ihrer Sammlung.

Sie können in RxJava (Durchlaufen von Elementen einer Sammlung) so ziemlich dasselbe tun, aber da sich RxJava auf gleichzeitige Aufgaben konzentriert, ... verwendet es Synchronisation, Latch, ... Die gleiche Aufgabe mit RxJava kann also langsamer sein als mit Java 8 Stream.

RxJava kann mit verglichen werden CompletableFuture, aber das kann mehr als nur einen Wert berechnen.

dwursteisen
quelle
12
Es ist erwähnenswert, dass Ihre Aussage zur Stream-Durchquerung nur für einen nicht parallelen Stream gilt. parallelStreamunterstützt ähnliche Synchronisation von einfachen Durchquerungen / Karten / Filtern usw.
John Vint
2
Ich glaube nicht, dass "dieselbe Aufgabe mit RxJava langsamer sein kann als mit Java 8-Stream." gilt universell, es hängt stark von der jeweiligen Aufgabe ab.
Daschl
1
Ich bin froh, dass Sie gesagt haben, dass dieselbe Aufgabe mit RxJava möglicherweise langsamer ist als mit Java 8-Stream . Dies ist eine sehr wichtige Unterscheidung, die vielen RxJava-Benutzern nicht bekannt ist.
IgorGanapolsky
RxJava ist standardmäßig synchron. Haben Sie Benchmarks, die Ihre Aussage stützen, dass sie möglicherweise langsamer ist?
Marcin Koziński
6
@ marcin-koziński Sie können diesen Benchmark überprüfen: twitter.com/akarnokd/status/752465265091309568
dwursteisen
37

Es gibt einige technische und konzeptionelle Unterschiede, z. B. sind Java 8-Streams Pull-basierte, synchrone Wertesequenzen für den einmaligen Gebrauch, während RxJava Observables wieder beobachtbare, adaptiv Push-Pull-basierte, möglicherweise asynchrone Wertesequenzen sind. RxJava richtet sich an Java 6+ und funktioniert auch unter Android.

akarnokd
quelle
4
Typischer Code mit RxJava verwendet häufig Lambdas, die nur ab Java 8 verfügbar sind. Sie können also Rx mit Java 6 verwenden, aber der Code wird verrauscht sein
Kirill Gamazkov
1
Ein ähnlicher Unterschied besteht darin, dass Rx Observables unbegrenzt am Leben bleiben können, bis sie sich abmelden. Java 8-Streams werden standardmäßig mit Vorgängen beendet.
Igor Ganapolsky
2
@ KirillGamazkov Sie können Retrolambda verwenden , um Ihren Code schöner zu machen, wenn Sie auf Java 6 abzielen.
Marcin Koziński
Kotlin sieht noch sexy aus als nachgerüstet
Kirill Gamazkov
30

Java 8-Streams basieren auf Pulls. Sie iterieren über einen Java 8-Stream, der jedes Element verbraucht. Und es könnte ein endloser Strom sein.

RXJava Observablebasiert standardmäßig auf Push. Sie abonnieren ein Observable und werden benachrichtigt, wenn der nächste Artikel eintrifft (onNext ), wenn der Stream abgeschlossen ist ( onCompleted) oder wenn ein Fehler aufgetreten ist ( onError). Denn mit ObservableIhnen erhalten onNext, onCompleted, onErrorVeranstaltungen, können Sie einige leistungsfähigen Funktionen tun wie andere Kombination von Observables zu einem neuen ( zip, merge,concat ). Sie können auch zwischenspeichern, drosseln, ... und es wird mehr oder weniger dieselbe API in verschiedenen Sprachen verwendet (RxJava, RX in C #, RxJS, ...).

Standardmäßig ist RxJava Single-Threaded. Wenn Sie keine Scheduler verwenden, geschieht alles im selben Thread.

Bart De Neuter
quelle
in Stream haben Sie für jeden, das ist so ziemlich das gleiche wie am nächsten
paul
Tatsächlich sind Streams normalerweise terminal. "Operationen, die eine Stream-Pipeline schließen, werden als Terminal-Operationen bezeichnet. Sie erzeugen ein Ergebnis aus einer Pipeline wie einer Liste, einer Ganzzahl oder sogar einer Leere (ein beliebiger Nicht-Stream-Typ)." ~ oracle.com/technetwork/articles/java/…
IgorGanapolsky
26

Die vorhandenen Antworten sind umfassend und korrekt, aber ein klares Beispiel für Anfänger fehlt. Gestatten Sie mir, einige konkrete Begriffe wie "Push / Pull-basiert" und "wieder beobachtbar" zu formulieren. Hinweis : Ich hasse den BegriffObservable (es ist ein Stream um Himmels willen), daher beziehe ich mich einfach auf J8- und RX-Streams.

Betrachten Sie eine Liste von ganzen Zahlen,

digits = [1,2,3,4,5]

Ein J8-Stream ist ein Dienstprogramm zum Ändern der Sammlung. Zum Beispiel können sogar Ziffern extrahiert werden als:

evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())

Dies ist im Grunde Pythons Map, Filter, Reduce , eine sehr schöne (und längst überfällige) Ergänzung zu Java. Aber was wäre, wenn die Ziffern nicht im Voraus gesammelt würden - was wäre, wenn die Ziffern während der Ausführung der App eingehen würden - könnten wir die Geraden in Echtzeit filtern.

Stellen Sie sich vor, ein separater Thread-Prozess gibt zu zufälligen Zeiten Ganzzahlen aus, während die App ausgeführt wird ( ---bezeichnet die Zeit).

digits = 12345---6------7--8--9-10--------11--12

evenKann in RX auf jede neue Ziffer reagieren und den Filter in Echtzeit anwenden

even = -2-4-----6---------8----10------------12

Es ist nicht erforderlich, Eingabe- und Ausgabelisten zu speichern. Wenn Sie eine Ausgabeliste möchten , ist dies kein Problem, das auch gestreamt werden kann. In der Tat ist alles ein Strom.

evens_stored = even.collect()  

Aus diesem Grund werden Begriffe wie "zustandslos" und "funktional" eher mit RX assoziiert

Adam Hughes
quelle
Aber 5 ist nicht einmal ... Und das sieht so aus, als ob J8 Stream synchron ist, während Rx Stream asynchron ist?
Franklin Yu
1
@FranklinYu danke ich habe den 5 Tippfehler behoben. Wenn Sie weniger an synchrone vs asyncrhouns denken, obwohl es richtig sein mag, und mehr an imperative vs funktionale. In J8 sammeln Sie zuerst alle Ihre Elemente und wenden dann den Filter an. In RX definieren Sie die Filterfunktion unabhängig von den Daten und ordnen sie dann einer geraden Quelle (einem Live-Stream oder einer Java-Sammlung) zu ... es ist ein völlig anderes Programmiermodell
Adam Hughes
Das überrascht mich sehr. Ich bin mir ziemlich sicher, dass Java-Streams aus Daten-Streaming bestehen können. Warum denken Sie das Gegenteil?
Vic Seedoubleyew
4

RxJava ist auch eng mit der Initiative für reaktive Streams verbunden und betrachtet sich selbst als eine einfache Implementierung der API für reaktive Streams (z. B. im Vergleich zur Implementierung von Akka-Streams ). Der Hauptunterschied besteht darin, dass die reaktiven Ströme so ausgelegt sind, dass sie mit Gegendruck umgehen können. Wenn Sie sich jedoch die Seite mit den reaktiven Strömen ansehen, werden Sie auf die Idee kommen. Sie beschreiben ihre Ziele ziemlich gut und die Streams sind auch eng mit dem reaktiven Manifest verbunden .

Die Java 8-Streams sind so ziemlich die Implementierung einer unbegrenzten Sammlung, die dem Scala Stream oder der Clojure Lazy Seq ziemlich ähnlich ist .

Niclas Meier
quelle
3

Java 8-Streams ermöglichen die effiziente Verarbeitung sehr großer Sammlungen unter Nutzung von Multicore-Architekturen. Im Gegensatz dazu ist RxJava standardmäßig Single-Threaded (ohne Scheduler). Daher nutzt RxJava Multi-Core-Maschinen nur, wenn Sie diese Logik selbst codieren.

IgorGanapolsky
quelle
4
Stream ist standardmäßig auch Single-Threaded, es sei denn, Sie rufen .parallel () auf. Außerdem bietet Rx mehr Kontrolle über die Parallelität.
Kirill Gamazkov
@KirillGamazkov Kotlin Coroutines Flow (basierend auf Java8-Streams) unterstützt jetzt strukturierte Parallelität: kotlinlang.org/docs/reference/coroutines/flow.html#flows
IgorGanapolsky
Stimmt, aber ich habe nichts über Flow und strukturierte Parallelität gesagt. Meine beiden Punkte waren: 1) Sowohl Stream als auch Rx sind Single-Threaded, es sei denn, Sie ändern dies ausdrücklich. 2) Rx gibt Ihnen eine genaue Kontrolle darüber, welcher Schritt für welchen Thread-Pool ausgeführt werden soll, im Gegensatz zu Streams, bei denen Sie nur sagen können, dass Sie es irgendwie parallel machen
Kirill Gamazkov,
Ich verstehe die Frage "Wofür brauchst du einen Thread-Pool?" Nicht wirklich. Wie Sie gesagt haben, "um die Verarbeitung wirklich großer Sammlungen effizient zu ermöglichen". Oder ich möchte, dass ein E / A-gebundener Teil der Aufgabe in einem separaten Thread-Pool ausgeführt wird. Ich glaube nicht, dass ich die Absicht hinter Ihrer Frage verstanden habe. Versuch es noch einmal?
Kirill Gamazkov
1
Mit statischen Methoden in der Schedulers-Klasse können vordefinierte Thread-Pools abgerufen und von Executor erstellt werden. Siehe reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/…
Kirill Gamazkov