Sagen wir, ich habe eine Observable
, wie so:
var one = someObservable.take(1);
one.subscribe(function(){ /* do something */ });
Dann habe ich eine Sekunde Observable
:
var two = someOtherObservable.take(1);
Nun möchte ich subscribe()
zu two
, aber ich möchte sicherstellen , dass one
abgeschlossen ist , bevor die two
Teilnehmer ausgelöst wird.
Mit welcher Puffermethode kann ich two
die zweite Methode warten lassen, bis die erste abgeschlossen ist?
Ich glaube , ich bin auf der Suche zu pausieren , two
bis one
abgeschlossen ist.
javascript
observable
rxjs
Stephen
quelle
quelle
Antworten:
Ein paar Möglichkeiten, die ich mir vorstellen kann
import {take, publish} from 'rxjs/operators' import {concat} from 'rxjs' //Method one var one = someObservable.pipe(take(1)); var two = someOtherObservable.pipe(take(1)); concat(one, two).subscribe(function() {/*do something */}); //Method two, if they need to be separate for some reason var one = someObservable.pipe(take(1)); var two = someOtherObservable.pipe(take(1), publish()); two.subscribe(function(){/*do something */}); one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
quelle
pause
undresume
anstelle vonpublish
und verwendetconnect
, aber Beispiel zwei ist im Wesentlichen der Weg, den ich eingeschlagen habe.one
) Auflösung vor der zweiten (two
) innerhalb der subscribe () - Funktion bewirken?Observable.forkJoin()
? Siehe diesen Link learnrxjs.io/operators/combination/forkjoin.htmlforkJoin
abonniert gleichzeitig.Wenn Sie sicherstellen möchten, dass die Ausführungsreihenfolge beibehalten wird, können Sie flatMap als folgendes Beispiel verwenden
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i)); const second = Rx.Observable.of(11).delay(500).do(i => console.log(i)); const third = Rx.Observable.of(111).do(i => console.log(i)); first .flatMap(() => second) .flatMap(() => third) .subscribe(()=> console.log('finished'));
Das Ergebnis wäre:
"1" "11" "111" "finished"
quelle
skipUntil () mit last ()
skipUntil: Ignoriert emittierte Elemente, bis ein anderes Observable emittiert wurde
last: letzten Wert aus einer Sequenz ausgeben (dh warten, bis er abgeschlossen ist, dann ausgeben)
Beachten Sie, dass alles, was von dem an übergebenen Observable ausgegeben
skipUntil
wird, das Überspringen abbricht, weshalb wir hinzufügen müssen, umlast()
auf den Abschluss des Streams zu warten.Offiziell: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
Mögliches Problem: Beachten Sie, dass
last()
selbst ein Fehler auftritt, wenn nichts ausgegeben wird. Derlast()
Operator hat einendefault
Parameter, jedoch nur in Verbindung mit einem Prädikat. Ich denke, wenn diese Situation ein Problem für Sie ist (wennsequence2$
sie ohne Aussendung abgeschlossen werden kann), sollte eine davon funktionieren (derzeit nicht getestet):main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last())) main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
Beachten Sie, dass dies
undefined
ein gültiges Element ist, das ausgegeben werden soll, aber tatsächlich ein beliebiger Wert sein kann. Beachten Sie auch, dass dies das Rohr ist, an dem das Rohr befestigt ist,sequence2$
und nicht dasmain$
Rohr.quelle
Hier ist noch eine weitere Möglichkeit, die Ergebnisauswahl von switchMap zu nutzen
var one$ = someObservable.take(1); var two$ = someOtherObservable.take(1); two$.switchMap( /** Wait for first Observable */ () => one$, /** Only return the value we're actually interested in */ (value2, value1) => value2 ) .subscribe((value2) => { /* do something */ });
Da die Ergebnisauswahl der switchMap abgeschrieben wurde, finden Sie hier eine aktualisierte Version
const one$ = someObservable.pipe(take(1)); const two$ = someOtherObservable.pipe( take(1), switchMap(value2 => one$.map(_ => value2)) ); two$.subscribe(value2 => { /* do something */ });
quelle
Hier ist eine wiederverwendbare Methode (es ist ein Typoskript, aber Sie können es an js anpassen):
export function waitFor<T>(signal: Observable<any>) { return (source: Observable<T>) => new Observable<T>(observer => signal.pipe(first()) .subscribe(_ => source.subscribe(observer) ) ); }
und Sie können es wie jeden Operator verwenden:
var two = someOtherObservable.pipe(waitFor(one), take(1));
Es ist im Grunde ein Operator, der das Abonnieren der beobachtbaren Quelle verzögert, bis das beobachtbare Signal das erste Ereignis aussendet.
quelle
Wenn das zweite beobachtbare Element heiß ist , gibt es eine andere Möglichkeit , Pause / Wiederaufnahme durchzuführen :
var pauser = new Rx.Subject(); var source1 = Rx.Observable.interval(1000).take(1); /* create source and pause */ var source2 = Rx.Observable.interval(1000).pausable(pauser); source1.doOnCompleted(function () { /* resume paused source2 */ pauser.onNext(true); }).subscribe(function(){ // do something }); source2.subscribe(function(){ // start to recieve data });
Sie können auch die gepufferte Version pausableBuffered verwenden , um Daten während der Pause zu speichern.
quelle
Hier ist noch eine andere, aber ich fühle mich unkomplizierter und intuitiver (oder zumindest natürlicher, wenn Sie an Versprechen gewöhnt sind). Grundsätzlich erstellen Sie ein Observable mit
Observable.create()
Wrapone
undtwo
als einzelnes Observable. Dies ist sehr ähnlich wiePromise.all()
möglicherweise funktioniert.var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ // observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); });
Also, was ist hier los? Zuerst erstellen wir ein neues Observable. Die an übergebene Funktion mit dem
Observable.create()
passenden NamenonSubscription
wird an den Beobachter übergeben (erstellt aus den Parametern, an die Sie übergebensubscribe()
), der beim Erstellen eines neuen Versprechens einem einzelnen Objekt ähneltresolve
undreject
zu einem einzigen Objekt kombiniert wird. So bringen wir die Magie zum Laufen.In
onSubscription
abonnieren wir das erste Observable (im obigen Beispiel wurde dies genanntone
). Wie wir damit umgehennext
und eserror
liegt an Ihnen, aber die in meinem Beispiel angegebene Standardeinstellung sollte im Allgemeinen angemessen sein. Wenn wir jedoch die erhaltencomplete
Ereignis erhalten, was bedeutet, dassone
es jetzt abgeschlossen ist, können wir das nächste Observable abonnieren. Dadurch wird das zweite Observable ausgelöst, nachdem das erste abgeschlossen ist.Der für das zweite Observable bereitgestellte Beispielbeobachter ist ziemlich einfach. Grundsätzlich verhält sich
second
jetzt so, wie Sie estwo
im OP erwarten würden . Insbesonderesecond
wird der erste und nur der erste Wert ausgegeben, der vonsomeOtherObservable
(wegentake(1)
) ausgegeben und dann abgeschlossen wird, vorausgesetzt, es liegt kein Fehler vor.Beispiel
Hier ist ein vollständiges Arbeitsbeispiel, das Sie kopieren / einfügen können, wenn Sie möchten, dass mein Beispiel im wirklichen Leben funktioniert:
var someObservable = Observable.from([1, 2, 3, 4, 5]); var someOtherObservable = Observable.from([6, 7, 8, 9]); var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); }).subscribe( function onNext(value) { console.log(value); }, function onError(error) { console.error(error); }, function onComplete() { console.log("Done!"); } );
Wenn Sie sich die Konsole ansehen, wird das obige Beispiel gedruckt:
quelle
Hier ist ein benutzerdefinierter Operator, der mit TypeScript geschrieben wurde und auf ein Signal wartet, bevor Ergebnisse ausgegeben werden:
export function waitFor<T>( signal$: Observable<any> ) { return (source$: Observable<T>) => new Observable<T>(observer => { // combineLatest emits the first value only when // both source and signal emitted at least once combineLatest([ source$, signal$.pipe( first(), ), ]) .subscribe(([v]) => observer.next(v)); }); }
Sie können es so verwenden:
two.pipe(waitFor(one)) .subscribe(value => ...);
quelle
Nun, ich weiß, dass dies ziemlich alt ist, aber ich denke, dass Sie möglicherweise Folgendes benötigen:
var one = someObservable.take(1); var two = someOtherObservable.pipe( concatMap((twoRes) => one.pipe(mapTo(twoRes))), take(1) ).subscribe((twoRes) => { // one is completed and we get two's subscription. })
quelle
Sie können das Ergebnis von vorherigen Observable dank des Operators mergeMap (oder seines Alias flatMap ) wie folgt verwenden :
const one = Observable.of('https://api.github.com/users'); const two = (c) => ajax(c);//ajax from Rxjs/dom library one.mergeMap(two).subscribe(c => console.log(c))
quelle