Wie erhalte ich den aktuellen Wert von RxJS Subject oder Observable?

206

Ich habe einen Angular 2-Dienst:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

Alles funktioniert super. Aber ich habe eine andere Komponente, die nicht abonniert werden muss, sondern nur den aktuellen Wert von isLoggedIn zu einem bestimmten Zeitpunkt abrufen muss. Wie kann ich das machen?

Baconbeastnz
quelle

Antworten:

341

A Subjectoder Observablehat keinen aktuellen Wert. Wenn ein Wert ausgegeben wird, wird er an Abonnenten übergeben und das Observableist damit erledigt.

Wenn Sie einen aktuellen Wert haben möchten, verwenden Sie diesen, BehaviorSubjectder genau für diesen Zweck entwickelt wurde. BehaviorSubjectbehält den zuletzt ausgegebenen Wert bei und gibt ihn sofort an neue Abonnenten weiter.

Es gibt auch eine Methode getValue(), um den aktuellen Wert abzurufen.

Günter Zöchbauer
quelle
1
Hallo, es ist mein schlechtes, sie sind jetzt das gleiche in den rxjs 5. Der obige Quellcode-Link bezog sich auf rxjs4.
Schrödingers Code
84
Wichtiger Hinweis des Autors von RxJS 5: Wenn Sie getValue()eine RIESIGE rote Fahne verwenden, machen Sie etwas falsch. Es ist dort als Notluke. Im Allgemeinen sollte alles, was Sie mit RxJS tun, deklarativ sein. getValue()ist zwingend erforderlich. Wenn Sie verwenden getValue(), besteht eine Wahrscheinlichkeit von 99,9%, dass Sie etwas falsch oder seltsam machen.
Ben Lesh
1
@ BenLesh was ist, wenn ich ein habe BehaviorSubject<boolean>(false)und es gerne umschalten möchte?
Bob
3
@BenLesh getValue () ist sehr nützlich, um beispielsweise eine sofortige Aktion auszuführen, z. B. onClick-> dispatchToServer (x.getValue ()), aber nicht in beobachtbaren Ketten.
FlavorScape
2
@ IngoBürk Der einzige Weg, wie ich Ihren Vorschlag rechtfertigen kann, ist, wenn Sie nicht wussten, dass dies foo$ein ist BehaviorSubject(dh es ist definiert als Observableund vielleicht ist es heiß oder kalt von einer zurückgestellten Quelle). Jedoch , da Sie dann Gebrauch machen .nextauf $foosich selbst , das bedeutet , dass Ihre Implementierung beruht auf es ist eine BehaviorSubjectso gibt es keine Rechtfertigung für die nicht nur mit .valueden aktuellen Wert an erster Stelle zu bekommen.
Simon_Weaver
144

Der einzige Weg , Sie sollten Werte „aus“ eine beobachtbare / Fach bekommen ist mit abonnieren!

Wenn Sie verwenden getValue(), tun Sie etwas Notwendiges im deklarativen Paradigma. Es ist dort als Notluke, aber 99,9% der Zeit sollten Sie NICHT verwenden getValue(). Es gibt ein paar interessante Dinge, getValue()die zu tun sind: Es wird ein Fehler ausgegeben, wenn das Thema abgemeldet wurde, es wird verhindert, dass Sie einen Wert erhalten, wenn das Thema tot ist, weil es fehlerhaft ist usw. Aber es ist wieder als Flucht da Luke für seltene Umstände.

Es gibt verschiedene Möglichkeiten, um den neuesten Wert von einem Subjekt oder Observable auf "Rx-y" Weise zu erhalten:

  1. Verwenden BehaviorSubject: Aber tatsächlich abonnieren . Wenn Sie es zum ersten Mal abonnieren, BehaviorSubjectwird synchron der vorherige Wert gesendet, den es empfangen hat oder mit dem es initialisiert wurde.
  2. Verwenden von a ReplaySubject(N): Dadurch werden NWerte zwischengespeichert und an neue Abonnenten wiedergegeben.
  3. A.withLatestFrom(B): Verwenden Sie diesen Operator, um den neuesten Wert von Observable zu erhalten, Bwenn Observable Aemittiert. Gibt Ihnen beide Werte in einem Array [a, b].
  4. A.combineLatest(B): Verwenden Sie die aktuellsten Werte von bekommen dieser Operator Aund Bjedes Mal entweder Aoder Baussendet. Gibt Ihnen beide Werte in einem Array.
  5. shareReplay(): Erstellt ein Observable-Multicast über a ReplaySubject, ermöglicht es Ihnen jedoch, das Observable bei einem Fehler erneut zu versuchen. (Grundsätzlich gibt es Ihnen das vielversprechende Caching-Verhalten).
  6. publishReplay(), publishBehavior(initialValue), multicast(subject: BehaviorSubject | ReplaySubject), Etc: Andere Betreiber , die Hebelwirkung BehaviorSubjectund ReplaySubject. Verschiedene Geschmacksrichtungen derselben Sache, sie senden im Grunde genommen die beobachtbare Quelle per Multicast, indem sie alle Benachrichtigungen durch ein Thema leiten. Sie müssen anrufen connect(), um die Quelle mit dem Betreff zu abonnieren.
Ben Lesh
quelle
19
Können Sie erläutern, warum die Verwendung von getValue () eine rote Fahne ist? Was ist, wenn ich den aktuellen Wert des Observable nur einmal benötige, sobald der Benutzer auf eine Schaltfläche klickt? Soll ich den Wert abonnieren und mich sofort abmelden?
Flamme
5
Manchmal ist es okay. Aber oft ist es ein Zeichen dafür, dass der Autor des Codes etwas sehr Gebotvolles tut (normalerweise mit Nebenwirkungen), wo es nicht sein sollte. Sie können auch click$.mergeMap(() => behaviorSubject.take(1))Ihr Problem lösen.
Ben Lesh
1
Tolle Erklärung! Wenn Sie Observable behalten und die Methoden verwenden können, ist dies ein viel besserer Weg. In einem Typoskriptkontext, in dem Observable überall verwendet wird, aber nur wenige als BehaviourSubject definiert sind, wäre es weniger konsistenter Code. Mit den von Ihnen vorgeschlagenen Methoden konnte ich überall beobachtbare Typen behalten.
JLavoie
2
Es ist gut, wenn Sie einfach den aktuellen Wert versenden möchten (z. B. per Klick an den Server senden). Ein getValue () ist sehr praktisch. Verwenden Sie es jedoch nicht, wenn Sie beobachtbare Operatoren verketten.
FlavorScape
1
Ich habe eine, AuthenticationServicedie a verwendet BehaviourSubject, um den aktuell angemeldeten Status ( boolean trueoder false) zu speichern . Es stellt ein isLoggedIn$Observable für Abonnenten bereit, die wissen möchten, wann sich der Status ändert. Außerdem wird eine get isLoggedIn()Eigenschaft verfügbar gemacht , die den aktuellen angemeldeten Status durch Aufrufen getValue()des Basiswerts zurückgibt. BehaviourSubjectDies wird von meinem Authentifizierungsschutz verwendet, um den aktuellen Status zu überprüfen. Dies scheint getValue()mir eine vernünftige Verwendung zu sein ...?
Dan King
11

Ich hatte eine ähnliche Situation, in der verspätete Abonnenten das Thema abonnierten, nachdem sein Wert angekommen war.

Ich habe festgestellt, dass ReplaySubject, das BehaviorSubject ähnelt, in diesem Fall wie ein Zauber wirkt. Und hier ist ein Link zur besseren Erklärung: http://reactivex.io/rxjs/manual/overview.html#replaysubject

Kfir Erez
quelle
1
Das hat mir in meiner Angular4-App geholfen - ich musste das Abonnement vom Komponentenkonstruktor auf ngOnInit () verschieben (diese Komponente wird über mehrere Routen hinweg gemeinsam genutzt) und nur hier abreisen, falls jemand ein ähnliches Problem hat
Luca
1
Ich hatte ein Problem mit einer Angular 5-App, bei der ich einen Dienst verwendete, um Werte von einer API abzurufen und Variablen in verschiedenen Komponenten festzulegen. Ich habe Subjekte / Observable verwendet, aber die Werte wurden nach einer Routenänderung nicht verschoben. ReplaySubject war ein Ersatz für Subject und löste alles.
Jafaircl
1
Stellen Sie sicher, dass Sie ReplaySubject (1) verwenden, da sonst neue Abonnenten jeden zuvor ausgegebenen Wert nacheinander erhalten - dies ist zur Laufzeit nicht immer offensichtlich
Drenai
@ Drenai soweit ich verstehe ReplaySubject (1) verhalten sich genauso wie BehaviorSubject ()
Kfir Erez
Nicht ganz dasselbe, der große Unterschied besteht darin, dass ReplaySubject beim Abonnieren nicht sofort einen Standardwert ausgibt, wenn seine next()Funktion noch nicht aufgerufen wurde, wohingegen BehaviourSubject dies tut. Die sofortige Ausgabe ist sehr praktisch, um Standardwerte auf eine Ansicht anzuwenden, wenn das BehaviourSubject als Datenquelle verwendet wird, die beispielsweise in einem Dienst beobachtet werden kann
Drenai
4
const observable = of('response')

function hasValue(value: any) {
  return value !== null && value !== undefined;
}

function getValue<T>(observable: Observable<T>): Promise<T> {
  return observable
    .pipe(
      filter(hasValue),
      first()
    )
    .toPromise();
}

const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................

Den vollständigen Artikel zur Implementierung finden Sie hier. https://www.imkrish.com/how-to-get-current-value-of-observable-in-a-clean-way/

Keerati Limkulphong
quelle
3

Ich hatte das gleiche Problem bei untergeordneten Komponenten, bei denen zunächst der aktuelle Wert des Betreffs angegeben und dann der Betreff abonniert werden musste, um Änderungen zu hören. Ich behalte nur den aktuellen Wert im Service bei, damit Komponenten darauf zugreifen können, z. B.:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {

  isLoggedIn: boolean;

  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
    this.currIsLoggedIn = false;
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
    this.isLoggedIn = value;
  }
}

Eine Komponente, die den aktuellen Wert benötigt, kann dann über den Dienst darauf zugreifen, dh:

sessionStorage.isLoggedIn

Ich bin mir nicht sicher, ob dies die richtige Praxis ist :)

Molp Burnbright
quelle
2
Wenn Sie den Wert eines Observable in Ihrer Komponentenansicht benötigen, können Sie einfach die asyncPipe verwenden.
Ingo Bürk
2

Eine ähnlich aussehende Antwort wurde abgelehnt. Aber ich denke, ich kann rechtfertigen, was ich hier für begrenzte Fälle vorschlage.


Es stimmt zwar, dass ein Observable keinen aktuellen Wert hat, aber sehr oft hat es einen sofort verfügbaren Wert. Beispielsweise können Sie bei Redux / Flux / Akita-Speichern Daten von einem zentralen Speicher anfordern, die auf einer Reihe von Observablen basieren, und dieser Wert ist im Allgemeinen sofort verfügbar.

Wenn dies der Fall ist subscribe, wird der Wert sofort wieder angezeigt.

Nehmen wir also an, Sie haben einen Anruf bei einem Dienstleister erhalten und möchten nach Abschluss den neuesten Wert von etwas aus Ihrem Geschäft abrufen, das möglicherweise nicht ausgegeben wird :

Sie könnten versuchen, dies zu tun (und Sie sollten die Dinge so weit wie möglich in Rohren aufbewahren):

 serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
                     .subscribe(([ serviceCallResponse, customer] => {

                        // we have serviceCallResponse and customer 
                     });

Das Problem dabei ist, dass es blockiert, bis das sekundäre Observable einen Wert ausgibt, der möglicherweise niemals sein könnte.

Vor kurzem musste ich ein Observable nur dann bewerten, wenn ein Wert sofort verfügbar war , und was noch wichtiger war, ich musste erkennen können, ob dies nicht der Fall war. Am Ende habe ich das gemacht:

 serviceCallResponse$.pipe()
                     .subscribe(serviceCallResponse => {

                        // immediately try to subscribe to get the 'available' value
                        // note: immediately unsubscribe afterward to 'cancel' if needed
                        let customer = undefined;

                        // whatever the secondary observable is
                        const secondary$ = store$.select(x => x.customer);

                        // subscribe to it, and assign to closure scope
                        sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
                        sub.unsubscribe();

                        // if there's a delay or customer isn't available the value won't have been set before we get here
                        if (customer === undefined) 
                        {
                           // handle, or ignore as needed
                           return throwError('Customer was not immediately available');
                        }
                     });

Beachten Sie, dass ich für alle oben genannten subscribePunkte den Wert erhalte (wie in @Ben erläutert). Ich benutze keine .valueImmobilie, auch wenn ich eine hatte BehaviorSubject.

Simon_Weaver
quelle
1
Übrigens funktioniert dies, weil 'Zeitplan' standardmäßig den aktuellen Thread verwendet.
Simon_Weaver
1

Obwohl es übertrieben klingen mag, ist dies nur eine weitere "mögliche" Lösung, um den Observable- Typ beizubehalten und die Boilerplate zu reduzieren ...

Sie können jederzeit einen Erweiterungs-Getter erstellen , um den aktuellen Wert eines Observable abzurufen.

Dazu müssten Sie die Observable<T>Schnittstelle in einer global.d.tsTypisierungsdeklarationsdatei erweitern. Implementieren Sie dann den Erweiterungs-Getter in eine observable.extension.tsDatei und fügen Sie Ihrer Anwendung schließlich sowohl Typisierungen als auch Erweiterungsdateien hinzu.

In dieser StackOverflow-Antwort erfahren Sie, wie Sie die Erweiterungen in Ihre Angular-Anwendung aufnehmen können.

// global.d.ts
declare module 'rxjs' {
  interface Observable<T> {
    /**
     * _Extension Method_ - Returns current value of an Observable.
     * Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
     */
    value: Observable<T>;
  }
}

// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
  get <T>(this: Observable<T>): Observable<T> {
    return this.pipe(
      filter(value => value !== null && value !== undefined),
      first());
  },
});

// using the extension getter example
this.myObservable$.value
  .subscribe(value => {
    // whatever code you need...
  });
j3ff
quelle
0

Sie können den zuletzt ausgegebenen Wert getrennt vom Observable speichern. Dann lesen Sie es bei Bedarf.

let lastValue: number;

const subscription = new Service().start();
subscription
    .subscribe((data) => {
        lastValue = data;
    }
);
Slawa
quelle
1
Es ist kein reaktiver Ansatz, einige Dinge außerhalb des Beobachtbaren zu speichern. Stattdessen sollten so viele Daten wie möglich in den beobachtbaren Streams fließen.
Ganqqwerty
0

Der beste Weg, dies zu tun, ist die Verwendung Behaviur Subjecteines Beispiels:

var sub = new rxjs.BehaviorSubject([0, 1])
sub.next([2, 3])
setTimeout(() => {sub.next([4, 5])}, 1500)
sub.subscribe(a => console.log(a)) //2, 3 (current value) -> wait 2 sec -> 4, 5
yaya
quelle
0

Ein Abonnement kann erstellt und nach Einnahme des ersten emittierten Gegenstands zerstört werden. Pipe ist eine Funktion, die eine Observable als Eingabe verwendet und eine andere Observable als Ausgabe zurückgibt, ohne die erste Observable zu ändern. Winkel 8.1.0. Pakete : "rxjs": "6.5.3","rxjs-observable": "0.0.7"

  ngOnInit() {

    ...

    // If loading with previously saved value
    if (this.controlValue) {

      // Take says once you have 1, then close the subscription
      this.selectList.pipe(take(1)).subscribe(x => {
        let opt = x.find(y => y.value === this.controlValue);
        this.updateValue(opt);
      });

    }
  }
SushiGuy
quelle