Ich habe mir das neue RX Java 2 angesehen und bin mir nicht ganz sicher, ob ich die Idee von verstehe backpressure
mehr ...
Mir ist bewusst, dass wir Observable
das nicht backpressure
unterstützen und habenFlowable
das hat es.
Nehmen wir also anhand eines Beispiels an, ich habe flowable
mit interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Dies wird nach ungefähr 128 Werten abstürzen, und das ist ziemlich offensichtlich, dass ich langsamer konsumiere als Gegenstände zu bekommen.
Aber dann haben wir das gleiche mit Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Dies wird überhaupt nicht abstürzen, selbst wenn ich den Verbrauch etwas verzögere, funktioniert es immer noch. Um die Flowable
Arbeit kann sagen , Ich habe onBackpressureDrop
Bediener, crash ist weg , aber nicht alle Werte sind entweder emittiert.
Die Grundfrage, die ich derzeit nicht in meinem Kopf beantworten kann, ist, warum ich mich darum kümmern sollte, backpressure
wenn ich einfach verwenden kann und Observable
trotzdem alle Werte erhalte, ohne die zu verwalten buffer
. Oder vielleicht von der anderen Seite, welche Vorteile bieten backpressure
mir für die Verwaltung und den Umgang mit dem Konsum?
Antworten:
Was sich in der Praxis im Gegendruck manifestiert, sind begrenzte Puffer
Flowable.observeOn
mit einem Puffer von 128 Elementen, der so schnell entleert wird, wie es der Dowstream aufnehmen kann. Sie können diese Puffergröße einzeln erhöhen, um die Bursty-Quelle zu verarbeiten, und alle Verfahren zur Verwaltung des Gegendrucks gelten weiterhin ab 1.x.Observable.observeOn
verfügt über einen unbegrenzten Puffer, der die Elemente ständig sammelt, und Ihre App verfügt möglicherweise nicht über genügend Speicher.Sie können
Observable
zum Beispiel verwenden:Sie können
Flowable
zum Beispiel verwenden:quelle
Maybe
,Single
undCompletable
kann immer statt verwendet werden ,Flowable
wenn sie semantisch angemessen sind?Maybe
,Single
, undCompletable
sind weit zu klein jede Notwendigkeit des Rückstau Konzept zu haben. Es gibt keine Chance, dass ein Produzent Artikel schneller emittieren kann als sie konsumiert werden können, da 0–1 Artikel jemals produziert oder konsumiert werden.Gegendruck tritt auf, wenn Ihr Observable (Publisher) mehr Ereignisse erstellt, als Ihr Abonnent verarbeiten kann. So können Abonnenten Ereignisse verpassen oder eine große Warteschlange von Ereignissen erhalten, die letztendlich zu Speichermangel führen.
Flowable
berücksichtigt den Gegendruck.Observable
nicht. Das ist es.es erinnert mich an einen Trichter, der überläuft, wenn zu viel Flüssigkeit überläuft. Flowable kann helfen, dies nicht zu erreichen:
mit enormem Gegendruck:
Bei Verwendung von fließfähigem Material ist der Gegendruck jedoch viel geringer:
Rxjava2 verfügt über einige Gegendruckstrategien, die Sie je nach Anwendungsfall anwenden können. Mit Strategie meine ich, dass Rxjava2 eine Möglichkeit bietet, mit Objekten umzugehen, die aufgrund des Überlaufs (Gegendruck) nicht verarbeitet werden können.
Hier sind die Strategien. Ich werde nicht alle durchgehen, aber wenn Sie sich zum Beispiel keine Sorgen über die überfüllten Elemente machen möchten, können Sie eine Drop-Strategie wie die folgende verwenden:
Observable.toFlowable (BackpressureStrategy.DROP)
Soweit ich weiß, sollte es ein Limit von 128 Elementen in der Warteschlange geben, danach kann es zu einem Überlauf (Gegendruck) kommen. Auch wenn es nicht 128 ist, ist es nahe an dieser Zahl. Hoffe das hilft jemandem.
Wenn Sie die Puffergröße von 128 ändern müssen, sieht es so aus (aber beachten Sie alle Speicherbeschränkungen:
Bei der Softwareentwicklung bedeutet eine Gegendruckstrategie normalerweise, dass Sie den Emitter anweisen, etwas langsamer zu werden, da der Verbraucher die Geschwindigkeit Ihrer emittierenden Ereignisse nicht bewältigen kann.
quelle
Die Tatsache, dass Sie
Flowable
nach der Ausgabe von 128 Werten ohne Gegendruckbehandlung abgestürzt sind, bedeutet nicht, dass es immer nach genau 128 Werten abstürzt: Manchmal stürzt es nach 10 ab, manchmal stürzt es überhaupt nicht ab. Ich glaube, dies ist passiert, als Sie das Beispiel ausprobiert habenObservable
- es gab zufällig keinen Gegendruck, sodass Ihr Code normal funktionierte, beim nächsten Mal möglicherweise nicht. Der Unterschied in RxJava 2 besteht darin, dass es inObservable
s kein Konzept für Gegendruck mehr gibt und keine Möglichkeit, damit umzugehen . Wenn Sie eine reaktive Sequenz entwerfen, die wahrscheinlich eine explizite Behandlung des Gegendrucks erfordert,Flowable
ist dies die beste Wahl.quelle
interval
ohne handhabe,backpressure
würde ich dann ein seltsames Verhalten oder Probleme erwarten?