Sind Java 8-Streams RxJava-Observablen ähnlich?
Java 8-Stream-Definition:
Klassen im neuen
java.util.stream
Paket bieten eine Stream-API zur Unterstützung von Operationen im Funktionsstil für Streams von Elementen.
java-8
java-stream
rx-java
observable
rahulrv
quelle
quelle
Antworten:
TL; DR : Alle Sequenz- / Stream-Verarbeitungsbibliotheken bieten eine sehr ähnliche API für den Pipeline-Aufbau. Die Unterschiede liegen in der API für die Handhabung von Multithreading und die Zusammensetzung von Pipelines.
RxJava unterscheidet sich stark von Stream. Von allen JDK-Dingen ist rava.Observable möglicherweise die Kombination
java.util.stream.CollectorStream + CompletableFuture am nächsten (was mit dem Umgang mit einer zusätzlichen Monadenebene verbunden ist, dh die Konvertierung zwischenStream<CompletableFuture<T>>
und handhaben mussCompletableFuture<Stream<T>>
).Es gibt signifikante Unterschiede zwischen Observable und Stream:
Stream#parallel()
teilt die Sequenz in Partitionen aufObservable#subscribeOn()
undObservable#observeOn()
tut dies nicht; Es ist schwierig, dasStream#parallel()
Verhalten mit Observable zu emulieren . Es hatte einmal eine.parallel()
Methode, aber diese Methode verursachte so viel Verwirrung, dass die.parallel()
Unterstützung in ein separates Repository auf Github, RxJavaParallel, verschoben wurde. Weitere Details finden Sie in einer anderen Antwort .Stream#parallel()
Im Gegensatz zu den meisten RxJava-Methoden, die optionalen Scheduler akzeptieren, kann kein zu verwendender Thread-Pool angegeben werden. Da alle Stream-Instanzen in einer JVM denselben Fork-Join-Pool verwenden,.parallel()
kann das Hinzufügen versehentlich das Verhalten in einem anderen Modul Ihres Programms beeinflussenObservable#interval()
,Observable#window()
und viele andere; Dies liegt hauptsächlich daran, dass Streams Pull-basiert sind und Upstream keine Kontrolle darüber hat, wann das nächste Element Downstream ausgegeben werden solltakeWhile()
,takeUntil()
); Die ProblemumgehungStream#anyMatch()
ist begrenzt: Es handelt sich um einen Terminalbetrieb, sodass Sie ihn nur einmal pro Stream verwenden könnenStreams sind schwer selbst zu erstellen. Observable kann auf viele Arten erstellt werden.BEARBEITEN: Wie in den Kommentaren erwähnt, gibt es Möglichkeiten, . Da es jedoch keinen nicht-terminalen Kurzschluss gibt, können Sie z. B. nicht einfach einen Zeilenstrom in einer Datei generieren (JDK bietet jedoch sofort die Zeilen "Files #" und "BufferedReader #" an, und andere ähnliche Szenarien können durch Erstellen eines Streams verwaltet werden von Iterator).Observable#using()
); Sie können IO-Stream oder Mutex damit umschließen und sicherstellen, dass der Benutzer nicht vergisst, die Ressource freizugeben. Sie wird bei Beendigung des Abonnements automatisch entsorgt. Stream hat eineonClose(Runnable)
Methode, aber Sie müssen sie manuell oder über Try-with-Resources aufrufen. Z.B. Sie müssen bedenken, dass Files # lines () muss in Try-mit-Ressourcen - Block eingeschlossen werden.Zusammenfassung: RxJava unterscheidet sich erheblich von Streams. Echte RxJava-Alternativen sind andere Implementierungen von ReactiveStreams , z. B. ein relevanter Teil von Akka.
Update . Es gibt einen Trick, für den ein nicht standardmäßiger Fork-Join-Pool verwendet werden kann
Stream#parallel
, siehe Benutzerdefinierter Thread-Pool im parallelen Java 8-StreamUpdate . All dies basiert auf den Erfahrungen mit RxJava 1.x. Jetzt, da RxJava 2.x hier ist , ist diese Antwort möglicherweise veraltet.
quelle
Stream.generate()
Ihre eigeneSupplier<U>
Implementierung aufrufen und übergeben , nur eine einfache Methode, mit der Sie das nächste Element im Stream bereitstellen. Es gibt viele andere Methoden. Um eine Sequenz zu erstellenStream
, die von vorherigen Werten abhängt, können Sie dieinterate()
Methode verwenden. JedeCollection
hat einestream()
Methode und erstelltStream.of()
eineStream
aus einem varargs oder Array.StreamSupport
Unterstützt schließlich die erweiterte Stream-Erstellung mithilfe von Spliteratoren oder primitiven Stream-Typen.takeWhile()
,takeUntil()
);" - JDK9 hat diese, glaube ich, in takeWhile () und dropWhile ()Java 8 Stream und RxJava sehen ziemlich ähnlich aus. Sie haben ähnliche Operatoren (Filter, Map, FlatMap ...), sind jedoch nicht für die gleiche Verwendung ausgelegt.
Sie können Asynchonus-Aufgaben mit RxJava ausführen.
Mit dem Java 8-Stream durchlaufen Sie Elemente Ihrer Sammlung.
Sie können in RxJava (Durchlaufen von Elementen einer Sammlung) so ziemlich dasselbe tun, aber da sich RxJava auf gleichzeitige Aufgaben konzentriert, ... verwendet es Synchronisation, Latch, ... Die gleiche Aufgabe mit RxJava kann also langsamer sein als mit Java 8 Stream.
RxJava kann mit verglichen werden
CompletableFuture
, aber das kann mehr als nur einen Wert berechnen.quelle
parallelStream
unterstützt ähnliche Synchronisation von einfachen Durchquerungen / Karten / Filtern usw.Es gibt einige technische und konzeptionelle Unterschiede, z. B. sind Java 8-Streams Pull-basierte, synchrone Wertesequenzen für den einmaligen Gebrauch, während RxJava Observables wieder beobachtbare, adaptiv Push-Pull-basierte, möglicherweise asynchrone Wertesequenzen sind. RxJava richtet sich an Java 6+ und funktioniert auch unter Android.
quelle
Java 8-Streams basieren auf Pulls. Sie iterieren über einen Java 8-Stream, der jedes Element verbraucht. Und es könnte ein endloser Strom sein.
RXJava
Observable
basiert standardmäßig auf Push. Sie abonnieren ein Observable und werden benachrichtigt, wenn der nächste Artikel eintrifft (onNext
), wenn der Stream abgeschlossen ist (onCompleted
) oder wenn ein Fehler aufgetreten ist (onError
). Denn mitObservable
Ihnen erhaltenonNext
,onCompleted
,onError
Veranstaltungen, können Sie einige leistungsfähigen Funktionen tun wie andere Kombination vonObservable
s zu einem neuen (zip
,merge
,concat
). Sie können auch zwischenspeichern, drosseln, ... und es wird mehr oder weniger dieselbe API in verschiedenen Sprachen verwendet (RxJava, RX in C #, RxJS, ...).Standardmäßig ist RxJava Single-Threaded. Wenn Sie keine Scheduler verwenden, geschieht alles im selben Thread.
quelle
Die vorhandenen Antworten sind umfassend und korrekt, aber ein klares Beispiel für Anfänger fehlt. Gestatten Sie mir, einige konkrete Begriffe wie "Push / Pull-basiert" und "wieder beobachtbar" zu formulieren. Hinweis : Ich hasse den Begriff
Observable
(es ist ein Stream um Himmels willen), daher beziehe ich mich einfach auf J8- und RX-Streams.Betrachten Sie eine Liste von ganzen Zahlen,
Ein J8-Stream ist ein Dienstprogramm zum Ändern der Sammlung. Zum Beispiel können sogar Ziffern extrahiert werden als:
Dies ist im Grunde Pythons Map, Filter, Reduce , eine sehr schöne (und längst überfällige) Ergänzung zu Java. Aber was wäre, wenn die Ziffern nicht im Voraus gesammelt würden - was wäre, wenn die Ziffern während der Ausführung der App eingehen würden - könnten wir die Geraden in Echtzeit filtern.
Stellen Sie sich vor, ein separater Thread-Prozess gibt zu zufälligen Zeiten Ganzzahlen aus, während die App ausgeführt wird (
---
bezeichnet die Zeit).even
Kann in RX auf jede neue Ziffer reagieren und den Filter in Echtzeit anwendenEs ist nicht erforderlich, Eingabe- und Ausgabelisten zu speichern. Wenn Sie eine Ausgabeliste möchten , ist dies kein Problem, das auch gestreamt werden kann. In der Tat ist alles ein Strom.
Aus diesem Grund werden Begriffe wie "zustandslos" und "funktional" eher mit RX assoziiert
quelle
RxJava ist auch eng mit der Initiative für reaktive Streams verbunden und betrachtet sich selbst als eine einfache Implementierung der API für reaktive Streams (z. B. im Vergleich zur Implementierung von Akka-Streams ). Der Hauptunterschied besteht darin, dass die reaktiven Ströme so ausgelegt sind, dass sie mit Gegendruck umgehen können. Wenn Sie sich jedoch die Seite mit den reaktiven Strömen ansehen, werden Sie auf die Idee kommen. Sie beschreiben ihre Ziele ziemlich gut und die Streams sind auch eng mit dem reaktiven Manifest verbunden .
Die Java 8-Streams sind so ziemlich die Implementierung einer unbegrenzten Sammlung, die dem Scala Stream oder der Clojure Lazy Seq ziemlich ähnlich ist .
quelle
Java 8-Streams ermöglichen die effiziente Verarbeitung sehr großer Sammlungen unter Nutzung von Multicore-Architekturen. Im Gegensatz dazu ist RxJava standardmäßig Single-Threaded (ohne Scheduler). Daher nutzt RxJava Multi-Core-Maschinen nur, wenn Sie diese Logik selbst codieren.
quelle