Es gibt zwei Teile in Rx; das Observable-Muster und eine solide Reihe von Operatoren, mit denen sie bearbeitet, transformiert und kombiniert werden können. Das beobachtbare Muster allein macht nicht viel. Gleiches gilt für Coroutinen. Es ist nur ein weiteres Paradigma, um mit Asynchronismus umzugehen. Sie können die Vor- und Nachteile von Rückrufen, Observable und Coroutinen vergleichen, um ein bestimmtes Problem zu lösen, aber Sie können ein Paradigma nicht mit einer voll funktionsfähigen Bibliothek vergleichen. Es ist wie ein Vergleich einer Sprache mit einem Framework.
Wie sind Kotlin-Coroutinen besser als RxKotlin? Ich habe noch keine Coroutinen verwendet, aber es sieht ähnlich aus wie async / wait in C #. Sie schreiben nur sequentiellen Code, alles ist so einfach wie das Schreiben von synchronem Code ... außer dass er asynchron ausgeführt wird. Es ist leichter zu begreifen.
Warum sollte ich Kotlin-Coroutinen verwenden wollen? Ich werde für mich selbst antworten. Die meiste Zeit werde ich mich an Rx halten, weil ich ereignisgesteuerte Architektur bevorzuge. Sollte sich jedoch die Situation ergeben, in der ich sequentiellen Code schreibe und in der Mitte eine asynchrone Methode aufrufen muss, werde ich gerne Coroutinen einsetzen, um dies so zu halten und zu vermeiden, dass alles in Observable eingeschlossen wird.
Bearbeiten : Jetzt, da ich Coroutinen verwende, ist es Zeit für ein Update.
RxKotlin ist nur syntaktischer Zucker für die Verwendung von RxJava in Kotlin, daher werde ich im Folgenden über RxJava und nicht über RxKotlin sprechen. Coroutinen sind ein niedrigerer Hebel und ein allgemeineres Konzept als RxJava. Sie dienen anderen Anwendungsfällen. Es gibt jedoch einen Anwendungsfall, in dem Sie RxJava und Coroutinen vergleichen können (channel
) Daten asynchron weitergegeben. Coroutinen haben hier einen klaren Vorteil gegenüber RxJava:
Coroutinen sind besser im Umgang mit Ressourcen
- In RxJava können Sie Berechnungen Disponenten zuweisen aber
subscribeOn()
und ObserveOn()
verwirrend sind. Jede Coroutine erhält einen Thread-Kontext und kehrt zum übergeordneten Kontext zurück. Für einen Kanal führen beide Seiten (Produzent, Konsument) in seinem eigenen Kontext aus. Coroutinen sind intuitiver in Bezug auf Thread- oder Threadpool-Affekte.
- Coroutinen geben mehr Kontrolle darüber, wann diese Berechnungen stattfinden. Sie können beispielsweise hand (
yield
) übergeben, priorisieren ( select
), parallelisieren (multiple producer
/ actor
on channel
) oder Ressourcen sperren (Mutex
) für eine bestimmte Berechnung . Auf dem Server (wo RxJava an erster Stelle stand) spielt dies möglicherweise keine Rolle, aber in einer Umgebung mit begrenzten Ressourcen ist diese Kontrollebene möglicherweise erforderlich.
- Aufgrund seiner reaktiven Natur passt der Gegendruck nicht gut in RxJava. Am anderen Ende
send()
des Kanals befindet sich eine Suspensionsfunktion, die suspendiert wird, wenn die Kanalkapazität erreicht ist. Es ist ein von der Natur ausgehender Gegendruck. Sie können auch offer()
kanalisieren. In diesem Fall wird der Anruf nie unterbrochen , sondern zurückgegeben, false
wenn der Kanal voll ist, und effektiv onBackpressureDrop()
von RxJava reproduziert . Oder Sie schreiben einfach Ihre eigene benutzerdefinierte Gegendrucklogik, was bei Coroutinen nicht schwierig ist, insbesondere im Vergleich zu RxJava.
Es gibt einen anderen Anwendungsfall, in dem Coroutinen glänzen und dies Ihre zweite Frage beantwortet: "Warum sollte ich Kotlin-Coroutinen verwenden wollen?". Coroutinen sind der perfekte Ersatz für Hintergrund-Threads oder AsyncTask
(Android). Es ist so einfach wie launch { someBlockingFunction() }
. Natürlich können Sie dies auch mit RxJava erreichen, indem Sie Schedulers
und Completable
vielleicht verwenden. Sie werden das Observer-Muster und die Operatoren, die die Signatur von RxJava sind, nicht (oder nur wenig) verwenden, ein Hinweis darauf, dass diese Arbeit für RxJava nicht möglich ist. Die Komplexität von RxJava (hier eine nutzlose Steuer) macht Ihren Code ausführlicher und weniger sauber als die Version von Coroutine.
Lesbarkeit ist wichtig. In dieser Hinsicht unterscheiden sich der Ansatz von RxJava und Coroutinen stark. Coroutinen sind einfacher als RxJava. Wenn Sie mit nicht wohl sind map()
, flatmap()
und funktionelle reaktive Programmierung im Allgemeinen, sind Koroutinen Manipulationen einfacher, Grundlagen Anweisungen beteiligt: for
, if
, try/catch
... Aber ich persönlich Koroutine des Code schwerer finden für nicht-triviale Aufgaben zu verstehen. Insbesondere geht es um mehr Verschachtelung und Einrückung, während die Verkettung von Operatoren in RxJava alles in einer Linie hält. Funktionale Programmierung macht die Verarbeitung expliziter. Darüber hinaus kann RxJava komplexe Transformationen mit einigen Standardoperatoren aus ihrem Rich-Operator-Set (OK, viel zu Rich) lösen. RxJava glänzt, wenn Sie komplexe Datenflüsse haben, die viele Kombinationen und Transformationen erfordern.
Ich hoffe, diese Überlegungen helfen Ihnen bei der Auswahl des richtigen Werkzeugs für Ihre Anforderungen.
Kotlin Coroutinen unterscheiden sich von Rx. Es ist schwierig, sie von Apfel zu Apfel zu vergleichen, da Kotlin-Coroutinen ein dünnes Sprachmerkmal sind (mit nur ein paar Grundkonzepten und ein paar Grundfunktionen, um sie zu manipulieren), während Rx eine ziemlich schwere Bibliothek mit einer ziemlich großen Auswahl an Äpfeln ist gebrauchsfertige Bediener. Beide wurden entwickelt, um ein Problem der asynchronen Programmierung anzugehen. Ihr Lösungsansatz ist jedoch sehr unterschiedlich:
Rx verfügt über einen bestimmten funktionalen Programmierstil, der in praktisch jeder Programmiersprache ohne Unterstützung durch die Sprache selbst implementiert werden kann. Es funktioniert gut, wenn sich das vorliegende Problem leicht in eine Folge von Standardoperatoren zerlegt und ansonsten nicht so gut.
Kotlin-Coroutinen bieten eine Sprachfunktion, mit der Bibliotheksschreiber verschiedene asynchrone Programmierstile implementieren können, einschließlich, aber nicht beschränkt auf den funktionalen reaktiven Stil (Rx). Mit Kotlin-Coroutinen können Sie Ihren asynchronen Code auch im imperativen Stil, im Versprechungs- / Zukunftsstil, im Schauspielerstil usw. schreiben.
Es ist besser, Rx mit einigen spezifischen Bibliotheken zu vergleichen, die basierend auf Kotlin-Coroutinen implementiert sind.
Nehmen Sie die Bibliothek kotlinx.coroutines als Beispiel die . Diese Bibliothek bietet eine Reihe von Grundelementen wie
async/await
und Kanäle, die normalerweise in andere Programmiersprachen eingebrannt werden. Es unterstützt auch leichte, zukunftslose Akteure. Weitere Informationen finden Sie im Handbuch zu kotlinx.coroutines anhand eines Beispiels .Von bereitgestellte Kanäle
kotlinx.coroutines
können Rx in bestimmten Anwendungsfällen ersetzen oder erweitern. Es gibt einen separaten Leitfaden für reaktive Ströme mit Coroutinen , der sich eingehender mit Ähnlichkeiten und Unterschieden mit Rx befasst.quelle
try-catch
. Sie erhalten eine sofort einsatzbereite Bereichssteuerung, die klar und visuell abgrenzt, was Sie schützen. Sie können diese Blöcke verschachteln und komplizierte Fehlerbehandlungsmuster codieren, über die Sie immer noch leicht nachdenken können. Syntaktisch kann eine Bibliothek, die auf Funktionen höherer Ordnung basiert, nur die Methodenkette verwenden. Coroutinen haben die ganze Sprache.Ich kenne RxJava sehr gut und bin kürzlich zu Kotlin Coroutines and Flow gewechselt.
RxKotlin ist im Grunde dasselbe wie RxJava, es fügt nur etwas syntaktischen Zucker hinzu, um das Schreiben von RxJava-Code in Kotlin komfortabler / idiomatischer zu gestalten.
Ein "fairer" Vergleich zwischen RxJava und Kotlin Coroutines sollte Flow in die Mischung aufnehmen und ich werde versuchen zu erklären, warum hier. Das wird ein bisschen lang, aber ich werde versuchen, es mit Beispielen so einfach wie möglich zu halten.
Mit RxJava haben Sie verschiedene Objekte (seit Version 2):
// 0-n events without backpressure management fun observeEventsA(): Observable<String> // 0-n events with explicit backpressure management fun observeEventsB(): Flowable<String> // exactly 1 event fun encrypt(original: String): Single<String> // 0-1 events fun cached(key: String): Maybe<MyData> // just completes with no specific results fun syncPending(): Completable
In Kotlin Coroutines + Flow benötigen Sie nicht viele Entitäten. Wenn Sie keinen Ereignisstrom haben, können Sie einfach Coroutinen verwenden (Suspendierungsfunktionen):
// 0-n events, the backpressure is automatically taken care off fun observeEvents(): Flow<String> // exactly 1 event suspend fun encrypt(original: String): String // 0-1 events suspend fun cached(key: String): MyData? // just completes with no specific results suspend fun syncPending()
Bonus: Kotlin Flow / Coroutines-Unterstützungswerte
null
(Unterstützung mit RxJava 2 entfernt)Was ist mit den Betreibern?
Mit RxJava haben Sie so viele Operatoren (
map
,filter
,flatMap
,switchMap
, ...), und für die meisten von ihnen gibt es eine Version für jeden Entitätstyp (Single.map()
,Observable.map()
, ...).Kotlin Coroutines + Flow benötigt nicht so viele Operatoren . Lassen Sie uns anhand einiger Beispiele zu den am häufigsten verwendeten Operatoren sehen, warum
Karte()
RxJava:
fun getPerson(id: String): Single<Person> fun observePersons(): Observable<Person> fun getPersonName(id: String): Single<String> { return getPerson(id) .map { it.firstName } } fun observePersonsNames(): Observable<String> { return observePersons() .map { it.firstName } }
Kotlin Coroutinen + Flow
suspend fun getPerson(id: String): Person fun observePersons(): Flow<Person> suspend fun getPersonName(id: String): String? { return getPerson(id).firstName } fun observePersonsNames(): Flow<String> { return observePersons() .map { it.firstName } }
Sie benötigen keinen Operator für den "Einzelfall" und er ist für den Fall ziemlich ähnlich
Flow
Fall .flatMap ()
Angenommen, Sie müssen für jede Person aus einer Datenbank (oder einem Remote-Service) die Versicherung abrufen
RxJava
fun fetchInsurance(insuranceId: String): Single<Insurance> fun getPersonInsurance(id: String): Single<Insurance> { return getPerson(id) .flatMap { person -> fetchInsurance(person.insuranceId) } } fun obseverPersonsInsurances(): Observable<Insurance> { return observePersons() .flatMap { person -> fetchInsurance(person.insuranceId) // this is a Single .toObservable() // flatMap expect an Observable } }
Mal sehen mit Kotlin Coroutiens + Flow
suspend fun fetchInsurance(insuranceId: String): Insurance suspend fun getPersonInsurance(id: String): Insurance { val person = getPerson(id) return fetchInsurance(person.insuranceId) } fun obseverPersonsInsurances(): Flow<Insurance> { return observePersons() .map { person -> fetchInsurance(person.insuranceId) } }
Wie zuvor benötigen wir in dem einfachen Coroutine-Fall keine Operatoren. Wir schreiben den Code einfach so, als ob er nicht asynchron wäre, sondern verwenden nur Suspending-Funktionen.
Und
Flow
da dies KEIN Tippfehler ist, ist keinflatMap
Operator erforderlich , wir können ihn nur verwendenmap
. Und der Grund ist, dass Map Lambda eine Suspendierungsfunktion ist! Wir können Suspending Code darin ausführen !!!Dafür brauchen wir keinen anderen Operator.
Für komplexere Aufgaben können Sie den Flow-
transform()
Operator verwenden.Jeder Flow-Operator akzeptiert eine Suspendierungsfunktion!
Wenn Sie also müssen,
filter()
aber Ihr Filter einen Netzwerkanruf ausführen muss, können Sie!fun observePersonsWithValidInsurance(): Flow<Person> { return observerPersons() .filter { person -> val insurance = fetchInsurance(person.insuranceId) insurance.isValid() } }
delay (), startWith (), concatWith (), ...
In RxJava gibt es viele Operatoren zum Anwenden von Verzögerungen oder zum Hinzufügen von Elementen vor und nach:
Mit Kotlin Flow können Sie einfach:
grabMyFlow() .onStart { // delay by 3 seconds before starting delay(3000L) // just emitting an item first emit("First item!") emit(cachedItem()) // call another suspending function and emit the result } .onEach { value -> // insert a delay of 1 second after a value only on some condition if (value.length() > 5) { delay(1000L) } } .onCompletion { val endingSequence: Flow<String> = grabEndingSequence() emitAll(endingSequence) }
Fehlerbehandlung
RxJava hat viele Operatoren, um Fehler zu behandeln:
Mit Flow brauchen Sie nicht viel mehr als den Operator
catch()
:grabMyFlow() .catch { error -> // emit something from the flow emit("We got an error: $error.message") // then if we can recover from this error emit it if (error is RecoverableError) { // error.recover() here is supposed to return a Flow<> to recover emitAll(error.recover()) } else { // re-throw the error if we can't recover (aka = don't catch it) throw error } }
und mit Suspending-Funktion können Sie einfach verwenden
try {} catch() {}
.einfach zu schreibende Flow-Operatoren
Aufgrund der Coroutinen, die Flow unter der Haube antreiben, ist es viel einfacher, Operatoren zu schreiben. Wenn Sie jemals einen RxJava-Operator überprüft haben, werden Sie sehen, wie schwierig es ist und wie viele Dinge Sie lernen müssen.
Das Schreiben von Kotlin Flow-Operatoren ist einfacher. Sie können sich ein Bild machen, indem Sie sich den Quellcode der Operatoren ansehen, die hier bereits Teil von Flow sind . Der Grund dafür ist, dass Coroutinen das Schreiben von asynchronem Code erleichtern und die Verwendung von Operatoren nur natürlicher ist.
Als Bonus sind alle Flow-Operatoren alle Kotlin-Erweiterungsfunktionen. Dies bedeutet, dass Sie oder Bibliotheken einfach Operatoren hinzufügen können und sich nicht komisch anfühlen (z. B.
observable.lift()
oderobservable.compose()
).Upstream-Thread leckt nicht Downstream
Was bedeutet das überhaupt?
Nehmen wir dieses RxJava-Beispiel:
urlsToCall() .switchMap { url -> if (url.scheme == "local") { val data = grabFromMemory(url.path) Flowable.just(data) } else { performNetworkCall(url) .subscribeOn(Subscribers.io()) .toObservable() } } .subscribe { // in which thread is this call executed? }
Wo ist also der Rückruf?
subscribe
ausgeführt?Die Antwort ist:
Wenn es aus dem Netzwerk kommt, befindet es sich in einem E / A-Thread. Wenn es aus dem anderen Zweig stammt, ist es undefiniert. Dies hängt davon ab, welcher Thread zum Senden der URL verwendet wird.
Dies ist das Konzept des "stromaufwärtigen Gewindes, das stromabwärts austritt".
Bei Flow und Coroutines ist dies nicht der Fall, es sei denn, Sie benötigen dieses Verhalten ausdrücklich (using
Dispatchers.Unconfined
).suspend fun myFunction() { // execute this coroutine body in the main thread withContext(Dispatchers.Main) { urlsToCall() .conflate() // to achieve the effect of switchMap .transform { url -> if (url.scheme == "local") { val data = grabFromMemory(url.path) emit(data) } else { withContext(Dispatchers.IO) { performNetworkCall(url) } } } .collect { // this will always execute in the main thread // because this is where we collect, // inside withContext(Dispatchers.Main) } } }
Coroutines-Code wird in dem Kontext ausgeführt, in dem sie ausgeführt wurden. Und nur der Teil mit dem Netzwerkaufruf wird auf dem E / A-Thread ausgeführt, während alles andere, was wir hier sehen, auf dem Hauptthread ausgeführt wird.
Nun, eigentlich wissen wir nicht, wo Code ausgeführt
grabFromMemory()
wird. Wenn es sich um eine Suspendierungsfunktion handelt, wissen wir nur, dass er innerhalb des Hauptthreads aufgerufen wird, aber innerhalb dieser Suspendierungsfunktion könnte ein anderer Dispatcher verwendet werden, aber wann wird er verwendet Komm zurück mit dem Ergebnis,val data
das wird wieder im Haupt-Thread sein.Wenn Sie sich einen Code ansehen, ist es einfacher zu erkennen, in welchem Thread er ausgeführt wird, wenn Sie einen expliziten Dispatcher sehen = es ist dieser Dispatcher, wenn Sie ihn nicht sehen: In welchem Thread-Dispatcher auch immer der Suspendierungsaufruf angezeigt wird wird gerufen.
Strukturierte Parallelität
Dies ist kein von Kotlin erfundenes Konzept, aber es ist etwas, das sie mehr als jede andere Sprache, die ich kenne, angenommen haben.
Wenn das, was ich hier erkläre, nicht ausreicht, lesen Sie diesen Artikel oder sehen Sie sich dieses Video an .
Also, was ist es?
Mit RxJava abonnieren Sie Observables und sie geben Ihnen ein
Disposable
Objekt.Sie müssen sich darum kümmern, es zu entsorgen, wenn es nicht mehr benötigt wird. Normalerweise behalten Sie also einen Verweis darauf (oder fügen ihn in ein
CompositeDisposable
) ein, um ihn später aufzurufendispose()
, wenn er nicht mehr benötigt wird. Wenn Sie dies nicht tun, gibt Ihnen der Linter eine Warnung.RxJava ist etwas schöner als ein traditioneller Thread. Wenn Sie einen neuen Thread erstellen und etwas darauf ausführen, ist dies ein "Feuer und Vergessen". Sie haben nicht einmal die Möglichkeit, ihn abzubrechen:
Thread.stop()
Ist veraltet, schädlich und die jüngste Implementierung führt tatsächlich nichts aus.Thread.interrupt()
Lässt Ihren Thread versagen usw. Alle Ausnahmen gehen verloren. Sie erhalten das Bild.Mit Kotlin Coroutinen und Flow kehren sie das "Einweg" -Konzept um. Sie können keine Coroutine ohne ein erstellen
CoroutineContext
.Dieser Kontext definiert die
scope
Ihrer Coroutine. Jede Kinder-Coroutine, die in dieser hervorgebracht wird, hat den gleichen Umfang.Wenn Sie einen Flow abonnieren, müssen Sie sich in einer Coroutine befinden oder auch einen Bereich angeben.
Sie können weiterhin die Coroutinen, die Sie starten (
Job
) , referenzieren und abbrechen. Dadurch wird jedes Kind dieser Coroutine automatisch abgebrochen.Wenn Sie ein Android-Entwickler sind, erhalten Sie diese Bereiche automatisch. Beispiel:
viewModelScope
und Sie können Coroutinen in einem viewModel mit diesem Bereich starten, da Sie wissen, dass sie automatisch gelöscht werden, wenn das Ansichtsmodell gelöscht wird.viewModelScope.launch { // my coroutine here }
Ein Teil des Bereichs wird beendet, wenn Kinder versagen, ein anderer Bereich lässt jedes Kind seinen eigenen Lebenszyklus verlassen, ohne andere Kinder zu stoppen, wenn eines ausfällt (
SupervisedJob
).Warum ist das eine gute Sache?
Lassen Sie mich versuchen, es wie Roman Elizarov zu erklären getan hat.
Einige alte Programmiersprachen hatten dieses Konzept, mit
goto
dem Sie nach Belieben von einer Codezeile zur nächsten springen können.Sehr mächtig, aber wenn Sie missbraucht werden, kann dies zu sehr schwer verständlichem Code führen, der schwer zu debuggen und zu begründen ist.
Neue Programmiersprachen haben es schließlich vollständig aus der Sprache entfernt.
Wenn Sie
if
oderwhile
oderwhen
es ist viel einfacher, über den Code nachzudenken: Egal, was in diesen Blöcken passiert, Sie werden irgendwann aus ihnen herauskommen, es ist ein "Kontext", Sie haben keine seltsamen Sprünge rein und raus .Das Starten eines Threads oder das Abonnieren eines RxJava-Observable ähnelt dem goto: Sie führen Code aus, der so lange ausgeführt wird, bis "anderswo" gestoppt wird.
Wenn Sie bei Coroutinen die Angabe eines Kontexts / Bereichs verlangen, wissen Sie, dass Coroutinen nach Abschluss Ihres Kontexts abgeschlossen sind, wenn Ihr Kontext abgeschlossen ist, unabhängig davon, ob Sie einzelne Coroutinen oder 10 Tausend haben.
Sie können immer noch mit Coroutinen "gehen", indem Sie verwenden
GlobalScope
, was Sie nicht aus dem gleichen Grund tun sollten, den Sie nichtgoto
in Sprachen verwenden sollten, die es bereitstellen.Irgendwelche Nachteile?
Flow befindet sich noch in der Entwicklung und einige Funktionen, die derzeit in RxJava verfügbar sind, sind in Kotlin Coroutines Flow noch nicht verfügbar.
Der große fehlt, gerade jetzt, ist
share()
Betreiber und seine Freunde (publish()
,replay()
etc ...)Sie befinden sich derzeit in einem fortgeschrittenen Entwicklungsstadium und werden voraussichtlich bald (kurz nach dem bereits veröffentlichten Kotlin
1.4.0
) veröffentlicht. Das API-Design finden Sie hier :quelle
Das von Ihnen verknüpfte Gespräch / Dokument spricht nicht über Kanäle. Kanäle füllen die Lücke zwischen Ihrem aktuellen Verständnis von Coroutinen und ereignisgesteuerter Programmierung.
Mit Coroutinen und Kanälen können Sie ereignisgesteuerte Programmierungen durchführen, wie Sie es wahrscheinlich mit rx gewohnt sind, aber Sie können dies mit synchron aussehendem Code und ohne so viele "benutzerdefinierte" Operatoren tun.
Wenn Sie dies besser verstehen möchten, schlage ich vor, außerhalb von Kotlin zu schauen, wo diese Konzepte ausgereifter und verfeinert sind (nicht experimentell). Schauen Sie sich
core.async
Clojure, Rich Hickey Videos, Beiträge und verwandte Diskussionen an.quelle
Coroutinen bieten ein leichtes asynchrones Programmierframework. Leichtgewicht in Bezug auf Ressourcen, die zum Starten des asynchronen Jobs benötigt werden. Coroutinen erzwingen keine Verwendung einer externen API und sind für die Benutzer (Programmierer) natürlicher. Im Gegensatz dazu verfügt RxJava + RxKotlin über ein zusätzliches Datenverarbeitungspaket, das in Kotlin nicht wirklich benötigt wird und über eine sehr umfangreiche API in der Standardbibliothek für die Verarbeitung von Sequenzen und Sammlungen verfügt.
Wenn Sie mehr über die praktische Verwendung von Coroutinen unter Android erfahren möchten, kann ich meinen Artikel empfehlen: https://www.netguru.com/codestories/android-coroutines-%EF%B8%8Fin-2020
quelle