Node.js Streams vs. Observables

77

Nachdem ich etwas über Observables gelernt habe , finde ich sie den Streams von Node.j ziemlich ähnlich . Beide haben einen Mechanismus, um den Verbraucher zu benachrichtigen, wenn neue Daten eintreffen, ein Fehler auftritt oder keine Daten mehr vorhanden sind (EOF).

Ich würde gerne etwas über die konzeptionellen / funktionalen Unterschiede zwischen den beiden lernen. Vielen Dank!

urish
quelle
1
@BenjaminGruenbaum Ich frage mich, warum Sie dies mit rxjs und Speck markiert haben? OP scheint sich auf die Observablen aus der Ecmascript-Harmonie
Bergi
@Bergi Vorkenntnisse über OP und die Frage. Grundsätzlich.
Benjamin Gruenbaum
1
Lol herzlichen Glückwunsch zu den positiven Stimmen, aber ich habe keine Ahnung, warum diese Frage nicht geschlossen wurde. Wie ist das eine echte Frage / angemessen für SO.
Alexander Mills
4
@AlexanderMills wie ist das keine passende Frage für SO? Dies ist keine Frage, die "Ihre Lieblingsfrage" ist. Es wird nach den Unterschieden zwischen zwei häufig verwendeten reaktiven Mustern in JS / Node gefragt.
Michael Martin-Smucker

Antworten:

100

Mit den Streams von Observables und node.js können Sie dasselbe zugrunde liegende Problem lösen: eine Folge von Werten asynchron verarbeiten. Der Hauptunterschied zwischen den beiden hängt meines Erachtens mit dem Kontext zusammen, der sein Auftreten motiviert hat. Dieser Kontext spiegelt sich in der Terminologie und API wider.

Auf der Observables- Seite haben Sie eine Erweiterung von EcmaScript, die das reaktive Programmiermodell einführt. Es versucht, die Lücke zwischen Wertschöpfung und Asynchronität mit den minimalistischen und zusammensetzbaren Konzepten von Observerund zu schließen Observable.

Auf der Seite node.js und Streams wollten Sie eine Schnittstelle für die asynchrone und performante Verarbeitung von Netzwerk-Streams und lokalen Dateien erstellen. Die Terminologie stammt aus diesem ersten Kontext und Sie erhalten pipe, chunk, encoding, flush, Duplex, Buffer, etc. Durch einen pragmatischen Ansatz mit , die für bestimmte Anwendungsfälle explizite Unterstützung bietet Ihnen eine gewisse Fähigkeit zu compose Dingen verlieren , weil es nicht so einheitlich ist. Zum Beispiel verwenden Sie pushauf einem ReadableStream und writeauf einem Writableobwohl konzeptionell tun Sie das Gleiche: Wert zu veröffentlichen.

Wenn Sie sich also in der Praxis die Konzepte ansehen und die Option verwenden { objectMode: true }, können Sie Observablemit dem ReadableStream und Observerdem WritableStream übereinstimmen . Sie können sogar einige einfache Adapter zwischen den beiden Modellen erstellen.

var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');

var Observable = function(subscriber) {
    this.subscribe = subscriber;
}

var Subscription = function(unsubscribe) {
    this.unsubscribe = unsubscribe;
}

Observable.fromReadable = function(readable) {
    return new Observable(function(observer) {
        function nop() {};

        var nextFn = observer.next ? observer.next.bind(observer) : nop;
        var returnFn = observer.return ? observer.return.bind(observer) : nop;
        var throwFn = observer.throw ? observer.throw.bind(observer) : nop;

        readable.on('data', nextFn);
        readable.on('end', returnFn);
        readable.on('error', throwFn);

        return new Subscription(function() {
            readable.removeListener('data', nextFn);
            readable.removeListener('end', returnFn);
            readable.removeListener('error', throwFn);
        });
    });
}

var Observer = function(handlers) {
    function nop() {};

    this.next = handlers.next || nop;
    this.return = handlers.return || nop;
    this.throw = handlers.throw || nop;
}

Observer.fromWritable = function(writable, shouldEnd, throwFn) {
    return new Observer({
        next: writable.write.bind(writable), 
        return: shouldEnd ? writable.end.bind(writable) : function() {}, 
        throw: throwFn
    });
}

Sie haben vielleicht bemerkt, dass ich einige Namen geändert und die einfacheren Konzepte von Observerund verwendet habe Subscription, um die Überlastung der von Observables in Generator. Grundsätzlich Subscriptionkönnen Sie sich mit dem abmelden Observable. Wie auch immer, mit dem obigen Code können Sie eine haben pipe.

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));

Im Vergleich process.stdin.pipe(process.stdout)dazu haben Sie eine Möglichkeit, Streams zu kombinieren, zu filtern und zu transformieren, die auch für jede andere Datensequenz funktioniert. Sie können es erreichen mit Readable, Transformund WritableStröme aber die API favorisiert Subklassen anstelle von Verkettungs Readables und Anwendung Funktionen. Im ObservableModell entspricht das Transformieren von Werten beispielsweise dem Anwenden einer Transformatorfunktion auf den Stream. Es ist kein neuer Subtyp von erforderlich Transform.

Observable.just = function(/*... arguments*/) {
    var values = arguments;
    return new Observable(function(observer) {
        [].forEach.call(values, function(value) {
            observer.next(value);
        });
        observer.return();
        return new Subscription(function() {});
    });
};

Observable.prototype.transform = function(transformer) {
    var source = this;
    return new Observable(function(observer) {
        return source.subscribe({
            next: function(v) {
                observer.next(transformer(v));
            },
            return: observer.return.bind(observer),
            throw: observer.throw.bind(observer)
        });
    });
};

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
  .subscribe(Observer.fromWritable(process.stdout))

Das Fazit? Es ist einfach, das reaktive Modell und das ObservableKonzept überall einzuführen . Es ist schwieriger, eine ganze Bibliothek um dieses Konzept herum zu implementieren. All diese kleinen Funktionen müssen konsequent zusammenarbeiten. Immerhin läuft das ReactiveX- Projekt noch. Aber wenn Sie den Dateiinhalt wirklich an den Client senden, sich mit der Codierung befassen und ihn komprimieren müssen, dann ist die Unterstützung in NodeJS vorhanden und funktioniert ziemlich gut.

m4ktub
quelle
5
Ich bin mir wirklich nicht sicher über diese ganze "Erweiterung auf Ecmascript". RxJS ist nur eine Bibliothek, genau wie RxJava usw. In ES7 oder ES8 gibt es möglicherweise einige Schlüsselwörter in ES / JS, die sich auf Observables beziehen, aber sie sind sicherlich nicht Teil der Sprache und sicherlich nicht, wenn Sie die Frage beantwortet haben im Jahr 2015.
Alexander Mills
1
Unterstützt die RX-Implementierung verlustfreien Gegendruck? Wenn beispielsweise nodejs den Stream im angehaltenen Modus lesen, können wir die read()Methode verwenden, um bei Bedarf aus dem Stream zu lesen. Und das drain eventkann signalisieren, dass der beschreibbare Stream mehr Daten empfangen kann.
Buggy