RxJS: Wie würde ich ein Observable "manuell" aktualisieren?

139

Ich denke, ich muss etwas Grundlegendes falsch verstehen, denn in meinen Augen sollte dies der grundlegendste Fall für ein Beobachtbares sein, aber für mein Leben kann ich aus den Dokumenten nicht herausfinden, wie es geht.

Grundsätzlich möchte ich dazu in der Lage sein:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

Aber ich konnte keine Methode wie diese finden push. Ich verwende dies für einen Klick-Handler, und ich weiß, dass sie dafür haben Observable.fromEvent, aber ich versuche, es mit React zu verwenden, und ich möchte lieber einfach den Datenstrom in einem Rückruf aktualisieren, anstatt einen völlig anderen zu verwenden Ereignisbehandlungssystem. Im Grunde will ich das:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

Das nächste, was ich bekam, war das Verwenden observer.onNext('foo'), aber das schien nicht wirklich zu funktionieren, und das fordert den Beobachter auf, was nicht richtig zu sein scheint. Der Beobachter sollte das Ding sein, das auf den Datenstrom reagiert und ihn nicht ändert, oder?

Verstehe ich die Beziehung zwischen Beobachter und Beobachtbarem einfach nicht?

LiamD
quelle
Schauen Sie sich dies an, um Ihre Idee zu verdeutlichen (Die Einführung in die reaktive Programmierung, die Sie vermisst haben): gist.github.com/staltz/868e7e9bc2a7b8c1f754 . Auch hier gibt es eine Reihe von Ressourcen, mit denen Sie Ihr Verständnis verbessern können: github.com/Reactive-Extensions/RxJS#resources
user3743222
Ich hatte das erste ausgecheckt, scheint eine solide Ressource zu sein. Die zweite ist eine großartige Liste, auf der ich aaronstacy.com/writings/reactive-programming-and-mvc gefunden habe, die mir geholfen hat, Rx.Subject zu entdecken, das mein Problem löst. So danke! Sobald ich ein bisschen mehr App geschrieben habe, werde ich meine Lösung veröffentlichen. Ich möchte sie nur ein bisschen im Kampf testen.
LiamD
Hehe, vielen Dank, dass Sie diese Frage gestellt haben. Ich wollte gerade dieselbe Frage mit demselben Codebeispiel stellen :-)
arturh

Antworten:

154

In RX sind Observer und Observable unterschiedliche Einheiten. Ein Beobachter abonniert ein Observable. Ein Observable sendet Elemente an seine Beobachter, indem es die Methoden der Beobachter aufruft. Wenn Sie die Beobachtermethoden außerhalb Ihres Anwendungsbereichs aufrufen müssen, Observable.create()können Sie einen Betreff verwenden, bei dem es sich um einen Proxy handelt, der gleichzeitig als Beobachter und beobachtbar fungiert.

Sie können dies tun:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}

Weitere Informationen zu Themen finden Sie hier:

luisgabriel
quelle
1
Genau das habe ich getan! Ich habe weiter daran gearbeitet, um herauszufinden, ob ich einen besseren Weg finden kann, um das zu tun, was ich tun muss, aber dies ist definitiv eine praktikable Lösung. Ich habe es zuerst in diesem Artikel gesehen: aaronstacy.com/writings/reactive-programming-and-mvc .
LiamD
34

Ich glaube, er Observable.create()nimmt keinen Beobachter als Rückrufparameter, sondern einen Emitter. Wenn Sie also Ihrem Observable einen neuen Wert hinzufügen möchten, versuchen Sie stattdessen Folgendes :

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

Ja, Betreff macht es einfacher, Observable und Observer im selben Objekt bereitzustellen, aber es ist nicht genau dasselbe, da Subject es Ihnen ermöglicht, mehrere Beobachter derselben Observable zu abonnieren, wenn eine Observable nur Daten an den zuletzt abonnierten Beobachter sendet. Verwenden Sie sie daher bewusst . Hier ist ein JsBin, wenn Sie daran basteln möchten.

theFreedomBanana
quelle
Ist die Möglichkeit, Emitter-Eigenschaften zu überschreiben, irgendwo in RxJS-Handbüchern dokumentiert?
Tomas
In diesem Fall emitterwerden nur next()neue Werte für den Beobachter angezeigt, der den letzten abonniert hat. Ein besserer Ansatz wäre, alle emitters in einem Array zu sammeln und sie alle und nextden Wert auf jedem von ihnen zu
durchlaufen