Rxjs: Observable.combineLatest vs Observable.forkJoin

84

Fragen Sie sich nur, was Unterschiede zwischen Observable.combineLatestund sind Observable.forkJoin. Soweit ich sehen kann, besteht der einzige Unterschied darin, forkJoindass erwartet wird, dass die Observables abgeschlossen sind, während combineLatestdie neuesten Werte zurückgegeben werden.

Tuong Le
quelle
4
Hinweis: In rxjs6 + das sind jetzt nur noch combineLatest()und forkJoin()Funktionen , die eine beobachtbare erstellen. Sie machen dasselbe, aber die Syntax ist unterschiedlich. Verwechseln Sie nicht, combineLatestvon rxjs/operatorswelchem ​​ein "pipeable" Operator ist. Wenn Sie den falschen importieren, werden Fehler angezeigt.
Simon_Weaver

Antworten:

126

Es müssen nicht nur forkJoinalle Eingabe-Observablen ausgefüllt werden, sondern es wird auch eine Observable zurückgegeben, die einen einzelnen Wert erzeugt, der ein Array der letzten von den Eingabe-Observablen erzeugten Werte ist. Mit anderen Worten, es wartet, bis die letzte beobachtbare Eingabe abgeschlossen ist, und erzeugt dann einen einzelnen Wert und wird abgeschlossen.

Im Gegensatz dazu wird combineLatestein Observable zurückgegeben, das jedes Mal einen neuen Wert erzeugt, wenn die eingegebenen Observablen dies tun, sobald alle eingegebenen Observablen mindestens einen Wert erzeugt haben. Dies bedeutet, dass es unendlich viele Werte haben kann und möglicherweise nicht vollständig ist. Dies bedeutet auch, dass die eingegebenen Observablen nicht vervollständigt werden müssen, bevor ein Wert erzeugt wird.

GregL
quelle
@ GregL gibt es eine Funktion, die wie forkJoin funktioniert, aber auch mit fehlgeschlagenen http-Aufrufen funktioniert?
Tukkan
2
@Tukkan, ich würde einen Operator, der Fehler wiederherstellt, mit jedem http-Observable verketten, sodass Sie die Werte definieren, die für jede fehlerhafte Anforderung verwendet werden sollen, anstatt nach einem Operator zu suchen, der mehrere Observables kombiniert, die möglicherweise fehlerhaft sind (I. Ich bin mir nicht sicher, ob ein solcher Operator existiert. Zu den Operatoren, die sich von Fehlern erholen, gehören .catch(), .onErrorResumeNext()und möglicherweise .retry()(wenn der HTTP-Aufruf zeitweise fehlschlägt).
GregL
1
Zur Verdeutlichung produzieren beide Arrays.
Simon_Weaver
@ Simon_Weaver Nicht unbedingt. Im Fall von combineLatest()können Sie eine Projektionsfunktion angeben, um anzugeben, wie ein Ausgabewert aus den neuesten Werten aus den Eingabe-Observablen erzeugt werden soll. Wenn Sie keinen angeben, erhalten Sie standardmäßig ein Array der zuletzt ausgegebenen Werte.
GregL
Jede Emission von combinLatest ist ein Array der gesammelten Werte. Ich wollte das nur hinzufügen, da Sie nur Array für Forkjoin erwähnt hatten. Auf diese Weise sind sie gleich. Und ja, es gibt immer Feinheiten bei RxJS. Wenn ich also etwas verpasst habe, können Sie klarstellen, was Sie gemeint haben.
Simon_Weaver
14

Ebenfalls:

combinLatest (...) führt Observables nacheinander aus

combineLatestintern verwendet concat, was bedeutet, dass für jede im Array beobachtbare Größe ein Wert ermittelt werden muss, bevor mit der nächsten fortgefahren werden kann.

// partial source of static combineLatest (uses the rxjs/operators combineLatest internally):
// If you're using typescript then the output array will be strongly typed based on type inference
return function (source) { return source.lift.call(from_1.from([source].concat(observables)), 
                           new combineLatest_1.CombineLatestOperator(project)); };

forkJoin (...) führt gleichzeitig Observables parallel aus

Wenn Sie 3 Observable-Quellen haben und deren Ausführung jeweils 5 Sekunden dauert, dauert die Ausführung 15 Sekunden combineLatest. Während forkJoinsie parallel ausgeführt werden, dauert es 5 Sekunden.

Funktioniert also forkJoineher dort, Promise.all(...)wo die Reihenfolge nicht durchgesetzt wird.

Überlegungen zur Fehlerbehandlung:

Wenn eines der Observablen fehlerhaft ist - mit combineLatestden folgenden wird es nicht ausgeführt, aber mit forkJoinihnen laufen alle. So combineLatestkann nützlich sein , eine ‚Folge‘ von Observablen auszuführen und alle zusammen das Ergebnis zu sammeln.


Erweiterter Hinweis: Wenn die Quell-Observables bereits ausgeführt werden (von etwas anderem abonniert) und Sie sharesie verwenden, wird dieses Verhalten nicht angezeigt.

Noch weiter fortgeschrittener Hinweis: CombineLatest gibt Ihnen immer das Neueste aus jeder Quelle. Wenn also eine der beobachtbaren Quellen mehrere Werte ausgibt, erhalten Sie das Neueste. Es wird nicht nur ein einzelner Wert für jede Quelle abgerufen und zur nächsten übergegangen. Wenn Sie sicherstellen müssen, dass Sie nur das nächste verfügbare Element für jede beobachtbare Quelle erhalten, können Sie .pipe(take(1))die beobachtbare Quelle hinzufügen , während Sie sie dem Eingabearray hinzufügen.

Simon_Weaver
quelle
Vielen Dank für Ihre Überlegungen zur Fehlerbehandlung. Die Fehlerbehandlung ist bei mehreren Observablen ziemlich schwer zu verstehen.
D Deshmane
2
Ich glaube, Sie haben falsch interpretiert, was concat()der combineLatest()Code tut. Es sieht für mich nach der Array.prototype.concatMethode aus, nicht nach der RxJS- concatMethode. Angenommen, ich habe Recht, dann ist diese Antwort irreführend und falsch, da combineLatest()Observable nicht einzeln nacheinander ausgeführt werden. Es führt sie parallel aus, genauso wie forkJoin(). Der Unterschied besteht darin, wie viele Werte erzeugt werden und ob die beobachtbaren Quellen vervollständigt werden müssen oder nicht.
GregL
@GregL Array.concat ist mit Observablen bedeutungslos. Ich verlasse mich absolut auf die sequentielle Natur von concat - das Wichtigste zu verstehen ist, dass Sie keine Ausgabewerte erhalten, bis alle ausgeführt wurden.
Simon_Weaver
Ich habe mich in dem von Ihnen geposteten Beispielquellcode auf diesen Code bezogen: [source].concat(observables)Angenommen, das haben Sie gemeint, als Sie sagten, dass " combineLatestintern verwendet concat". Mir scheint, Sie verwechseln Array Concat mit RxJS Concat. Letzterer führt die eingegebenen Observablen tatsächlich nacheinander aus und wartet, bis alle abgeschlossen sind. Dies wird jedoch nicht von combineLatest()einem separaten Operator verwendet.
GregL
3
Ich glaube combineLatest()und forkJoin()beide laufen parallel. Wenn Sie den folgenden Code ausführen, werden für beide ungefähr 5000 ausgegeben. const start = new Date().getTime(); combineLatest([of(null).pipe(delay(5000)), of(null).pipe(delay(5000)), of(null).pipe(delay(5000))]).subscribe(() => console.log(new Date().getTime() - start)); forkJoin([of(null).pipe(delay(5000)), of(null).pipe(delay(5000)), of(null).pipe(delay(5000))]).subscribe(() => console.log(new Date().getTime() - start));
Jeremy
12

forkJoin - Wenn alle Observablen abgeschlossen sind, geben Sie jeweils den zuletzt ausgegebenen Wert aus.

combinLatest - Wenn ein Observable einen Wert ausgibt , geben Sie jeweils den neuesten Wert aus.

Die Verwendung ist ziemlich ähnlich, aber Sie sollten nicht vergessen, sich im Gegensatz zu forkJoin von combinLatest abzumelden .

Dmitry Grinko
quelle
1
Wenn eine Anforderung fehlschlägt, werden alle verschachtelten Anforderungen automatisch abgebrochen. Wie kann ich das lösen?
Sunil Garg
1
@SunilGarg Sie sollten forkJoin oder combinLatest verwenden, wenn Sie nur alle Ergebnisse erhalten möchten. Wenn Sie nicht alle Ergebnisse interessieren, sollten Sie Abonnements separat verwenden.
Dmitry Grinko
können Sie diese stackoverflow.com/questions/55558277/…
Sunil Garg
Können Sie anhand des Codebeispiels erklären, warum das Abbestellen erforderlich ist?
Sunil Garg