Wie lässt man eine Observable-Sequenz warten, bis eine andere abgeschlossen ist, bevor sie gesendet wird?

86

Sagen wir, ich habe eine Observable, wie so:

var one = someObservable.take(1);

one.subscribe(function(){ /* do something */ });

Dann habe ich eine Sekunde Observable:

var two = someOtherObservable.take(1);

Nun möchte ich subscribe()zu two, aber ich möchte sicherstellen , dass oneabgeschlossen ist , bevor die twoTeilnehmer ausgelöst wird.

Mit welcher Puffermethode kann ich twodie zweite Methode warten lassen, bis die erste abgeschlossen ist?

Ich glaube , ich bin auf der Suche zu pausieren , twobis oneabgeschlossen ist.

Stephen
quelle
1
Ich glaube, die Antwort darauf ist die .exhaustMap () -Methode, aber ich würde nicht vorgeben zu wissen, wie man sie implementiert - vollständige Beschreibung hier: blog.angular-university.io/rxjs-higher-order-mapping
Peter Nixey

Antworten:

53

Ein paar Möglichkeiten, die ich mir vorstellen kann

import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'

//Method one

var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
Paulpdaniels
quelle
1
Am Ende habe ich pauseund resumeanstelle von publishund verwendet connect, aber Beispiel zwei ist im Wesentlichen der Weg, den ich eingeschlagen habe.
Stephen
1
Wird diese Methode immer die erste beobachtbare ( one) Auflösung vor der zweiten ( two) innerhalb der subscribe () - Funktion bewirken?
John
Warum nicht verwenden Observable.forkJoin()? Siehe diesen Link learnrxjs.io/operators/combination/forkjoin.html
mspasiuk
16
@mspasiuk gemäß der OP-Anforderung wollten sie erst, dass der zweite abonniert, nachdem der erste abgeschlossen war. forkJoinabonniert gleichzeitig.
Paulpdaniels
17

Wenn Sie sicherstellen möchten, dass die Ausführungsreihenfolge beibehalten wird, können Sie flatMap als folgendes Beispiel verwenden

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));

Das Ergebnis wäre:

"1"
"11"
"111"
"finished"
Nikos Tsokos
quelle
15

skipUntil () mit last ()

skipUntil: Ignoriert emittierte Elemente, bis ein anderes Observable emittiert wurde

last: letzten Wert aus einer Sequenz ausgeben (dh warten, bis er abgeschlossen ist, dann ausgeben)

Beachten Sie, dass alles, was von dem an übergebenen Observable ausgegeben skipUntilwird, das Überspringen abbricht, weshalb wir hinzufügen müssen, um last()auf den Abschluss des Streams zu warten.

main$.skipUntil(sequence2$.pipe(last()))

Offiziell: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil


Mögliches Problem: Beachten Sie, dass last()selbst ein Fehler auftritt, wenn nichts ausgegeben wird. Der last()Operator hat einen defaultParameter, jedoch nur in Verbindung mit einem Prädikat. Ich denke, wenn diese Situation ein Problem für Sie ist (wenn sequence2$sie ohne Aussendung abgeschlossen werden kann), sollte eine davon funktionieren (derzeit nicht getestet):

main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))

Beachten Sie, dass dies undefinedein gültiges Element ist, das ausgegeben werden soll, aber tatsächlich ein beliebiger Wert sein kann. Beachten Sie auch, dass dies das Rohr ist, an dem das Rohr befestigt ist, sequence2$und nicht das main$Rohr.

Simon_Weaver
quelle
Sehr ungeschickte Demo: angle-vgznak.stackblitz.io Sie müssen klicken, um die Konsolenablage zu öffnen
Simon_Weaver
Ihre Syntax ist falsch. skipUntil kann nicht direkt an eine Observable angehängt werden, da sonst die folgende Fehlermeldung angezeigt wird: 'Eigenschaft' skipUntil 'ist für den Typ' Observable <any> 'nicht vorhanden.' Sie müssen es zuerst durch .pipe ()
London804
Ja, dies ist eine alte Antwort, bevor Pipe benötigt wurde. Danke, dass du es erwähnt hast. Ich würde es jetzt aktualisieren, aber ich bin auf meinem Handy. Fühlen Sie sich frei, die Antwort zu bearbeiten.
Simon_Weaver
13

Hier ist noch eine weitere Möglichkeit, die Ergebnisauswahl von switchMap zu nutzen

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => {
    /* do something */ 
  });

Da die Ergebnisauswahl der switchMap abgeschrieben wurde, finden Sie hier eine aktualisierte Version

const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
  take(1),
  switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
  /* do something */ 
});
Joe King
quelle
8

Hier ist eine wiederverwendbare Methode (es ist ein Typoskript, aber Sie können es an js anpassen):

export function waitFor<T>(signal: Observable<any>) {
    return (source: Observable<T>) =>
        new Observable<T>(observer =>
            signal.pipe(first())
                .subscribe(_ =>
                    source.subscribe(observer)
                )
        );
}

und Sie können es wie jeden Operator verwenden:

var two = someOtherObservable.pipe(waitFor(one), take(1));

Es ist im Grunde ein Operator, der das Abonnieren der beobachtbaren Quelle verzögert, bis das beobachtbare Signal das erste Ereignis aussendet.

Andrei Tătar
quelle
6

Wenn das zweite beobachtbare Element heiß ist , gibt es eine andere Möglichkeit , Pause / Wiederaufnahme durchzuführen :

var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);

source1.doOnCompleted(function () { 
  /* resume paused source2 */ 
  pauser.onNext(true);
}).subscribe(function(){
  // do something
});

source2.subscribe(function(){
  // start to recieve data 
});

Sie können auch die gepufferte Version pausableBuffered verwenden , um Daten während der Pause zu speichern.

Anton
quelle
2

Hier ist noch eine andere, aber ich fühle mich unkomplizierter und intuitiver (oder zumindest natürlicher, wenn Sie an Versprechen gewöhnt sind). Grundsätzlich erstellen Sie ein Observable mit Observable.create()Wrap oneund twoals einzelnes Observable. Dies ist sehr ähnlich wiePromise.all() möglicherweise funktioniert.

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      // observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
});

Also, was ist hier los? Zuerst erstellen wir ein neues Observable. Die an übergebene Funktion mit dem Observable.create()passenden Namen onSubscriptionwird an den Beobachter übergeben (erstellt aus den Parametern, an die Sie übergebensubscribe() ), der beim Erstellen eines neuen Versprechens einem einzelnen Objekt ähnelt resolveund rejectzu einem einzigen Objekt kombiniert wird. So bringen wir die Magie zum Laufen.

In onSubscriptionabonnieren wir das erste Observable (im obigen Beispiel wurde dies genannt one). Wie wir damit umgehen nextund es errorliegt an Ihnen, aber die in meinem Beispiel angegebene Standardeinstellung sollte im Allgemeinen angemessen sein. Wenn wir jedoch die erhaltencomplete Ereignis erhalten, was bedeutet, dass onees jetzt abgeschlossen ist, können wir das nächste Observable abonnieren. Dadurch wird das zweite Observable ausgelöst, nachdem das erste abgeschlossen ist.

Der für das zweite Observable bereitgestellte Beispielbeobachter ist ziemlich einfach. Grundsätzlich verhält sich secondjetzt so, wie Sie es twoim OP erwarten würden . Insbesondere secondwird der erste und nur der erste Wert ausgegeben, der von someOtherObservable(wegen take(1)) ausgegeben und dann abgeschlossen wird, vorausgesetzt, es liegt kein Fehler vor.

Beispiel

Hier ist ein vollständiges Arbeitsbeispiel, das Sie kopieren / einfügen können, wenn Sie möchten, dass mein Beispiel im wirklichen Leben funktioniert:

var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
}).subscribe(
  function onNext(value) {
    console.log(value);
  },
  function onError(error) {
    console.error(error);
  },
  function onComplete() {
    console.log("Done!");
  }
);

Wenn Sie sich die Konsole ansehen, wird das obige Beispiel gedruckt:

1

6

Erledigt!

c1moore
quelle
Dies war der Durchbruch, den ich brauchte, um meinen eigenen benutzerdefinierten 'Cluster (T, X, D)' - Operator zu erstellen, der nur die ersten X-Emissionen innerhalb der Zeitspanne T von der Quelle verarbeitet und Ergebnisse ausgibt, die durch die D-Verzögerung voneinander getrennt sind. Vielen Dank!
wonkim00
Ich bin froh, dass es geholfen hat, es war sehr aufschlussreich, als ich das auch realisierte.
c1moore
2

Hier ist ein benutzerdefinierter Operator, der mit TypeScript geschrieben wurde und auf ein Signal wartet, bevor Ergebnisse ausgegeben werden:

export function waitFor<T>(
    signal$: Observable<any>
) {
    return (source$: Observable<T>) =>
        new Observable<T>(observer => {
            // combineLatest emits the first value only when
            // both source and signal emitted at least once
            combineLatest([
                source$,
                signal$.pipe(
                    first(),
                ),
            ])
                .subscribe(([v]) => observer.next(v));
        });
}

Sie können es so verwenden:

two.pipe(waitFor(one))
   .subscribe(value => ...);
Sergiu
quelle
1
schönes Muster! Sie können sogar three.pipe (waitFor (eins), waitFor (zwei), take (1)) machen
David Rinck
1

Nun, ich weiß, dass dies ziemlich alt ist, aber ich denke, dass Sie möglicherweise Folgendes benötigen:

var one = someObservable.take(1);

var two = someOtherObservable.pipe(
  concatMap((twoRes) => one.pipe(mapTo(twoRes))),
  take(1)
).subscribe((twoRes) => {
   // one is completed and we get two's subscription.
})
itay oded
quelle
0

Sie können das Ergebnis von vorherigen Observable dank des Operators mergeMap (oder seines Alias flatMap ) wie folgt verwenden :

 const one = Observable.of('https://api.github.com/users');
 const two = (c) => ajax(c);//ajax from Rxjs/dom library

 one.mergeMap(two).subscribe(c => console.log(c))
Tktorza
quelle
von hier aus: learnrxjs.io/learn-rxjs/operators/transformation/mergemap - "Wenn die Reihenfolge der Emission und des Abonnements der inneren Observablen wichtig ist, versuchen Sie es mit concatMap!"
gsziszi