Nachdem ich etwas über Observables gelernt habe , finde ich sie den Streams von Node.j ziemlich ähnlich . Beide haben einen Mechanismus, um den Verbraucher zu benachrichtigen, wenn neue Daten eintreffen, ein Fehler auftritt oder keine Daten mehr vorhanden sind (EOF).
Ich würde gerne etwas über die konzeptionellen / funktionalen Unterschiede zwischen den beiden lernen. Vielen Dank!
Antworten:
Mit den Streams von Observables und node.js können Sie dasselbe zugrunde liegende Problem lösen: eine Folge von Werten asynchron verarbeiten. Der Hauptunterschied zwischen den beiden hängt meines Erachtens mit dem Kontext zusammen, der sein Auftreten motiviert hat. Dieser Kontext spiegelt sich in der Terminologie und API wider.
Auf der Observables- Seite haben Sie eine Erweiterung von EcmaScript, die das reaktive Programmiermodell einführt. Es versucht, die Lücke zwischen Wertschöpfung und Asynchronität mit den minimalistischen und zusammensetzbaren Konzepten von
Observer
und zu schließenObservable
.Auf der Seite node.js und Streams wollten Sie eine Schnittstelle für die asynchrone und performante Verarbeitung von Netzwerk-Streams und lokalen Dateien erstellen. Die Terminologie stammt aus diesem ersten Kontext und Sie erhalten
pipe
,chunk
,encoding
,flush
,Duplex
,Buffer
, etc. Durch einen pragmatischen Ansatz mit , die für bestimmte Anwendungsfälle explizite Unterstützung bietet Ihnen eine gewisse Fähigkeit zu compose Dingen verlieren , weil es nicht so einheitlich ist. Zum Beispiel verwenden Siepush
auf einemReadable
Stream undwrite
auf einemWritable
obwohl konzeptionell tun Sie das Gleiche: Wert zu veröffentlichen.Wenn Sie sich also in der Praxis die Konzepte ansehen und die Option verwenden
{ objectMode: true }
, können SieObservable
mit demReadable
Stream undObserver
demWritable
Stream übereinstimmen . Sie können sogar einige einfache Adapter zwischen den beiden Modellen erstellen.var Readable = require('stream').Readable; var Writable = require('stream').Writable; var util = require('util'); var Observable = function(subscriber) { this.subscribe = subscriber; } var Subscription = function(unsubscribe) { this.unsubscribe = unsubscribe; } Observable.fromReadable = function(readable) { return new Observable(function(observer) { function nop() {}; var nextFn = observer.next ? observer.next.bind(observer) : nop; var returnFn = observer.return ? observer.return.bind(observer) : nop; var throwFn = observer.throw ? observer.throw.bind(observer) : nop; readable.on('data', nextFn); readable.on('end', returnFn); readable.on('error', throwFn); return new Subscription(function() { readable.removeListener('data', nextFn); readable.removeListener('end', returnFn); readable.removeListener('error', throwFn); }); }); } var Observer = function(handlers) { function nop() {}; this.next = handlers.next || nop; this.return = handlers.return || nop; this.throw = handlers.throw || nop; } Observer.fromWritable = function(writable, shouldEnd, throwFn) { return new Observer({ next: writable.write.bind(writable), return: shouldEnd ? writable.end.bind(writable) : function() {}, throw: throwFn }); }
Sie haben vielleicht bemerkt, dass ich einige Namen geändert und die einfacheren Konzepte von
Observer
und verwendet habeSubscription
, um die Überlastung der von Observables inGenerator
. GrundsätzlichSubscription
können Sie sich mit dem abmeldenObservable
. Wie auch immer, mit dem obigen Code können Sie eine habenpipe
.Im Vergleich
process.stdin.pipe(process.stdout)
dazu haben Sie eine Möglichkeit, Streams zu kombinieren, zu filtern und zu transformieren, die auch für jede andere Datensequenz funktioniert. Sie können es erreichen mitReadable
,Transform
undWritable
Ströme aber die API favorisiert Subklassen anstelle von VerkettungsReadable
s und Anwendung Funktionen. ImObservable
Modell entspricht das Transformieren von Werten beispielsweise dem Anwenden einer Transformatorfunktion auf den Stream. Es ist kein neuer Subtyp von erforderlichTransform
.Observable.just = function(/*... arguments*/) { var values = arguments; return new Observable(function(observer) { [].forEach.call(values, function(value) { observer.next(value); }); observer.return(); return new Subscription(function() {}); }); }; Observable.prototype.transform = function(transformer) { var source = this; return new Observable(function(observer) { return source.subscribe({ next: function(v) { observer.next(transformer(v)); }, return: observer.return.bind(observer), throw: observer.throw.bind(observer) }); }); }; Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify) .subscribe(Observer.fromWritable(process.stdout))
Das Fazit? Es ist einfach, das reaktive Modell und das
Observable
Konzept überall einzuführen . Es ist schwieriger, eine ganze Bibliothek um dieses Konzept herum zu implementieren. All diese kleinen Funktionen müssen konsequent zusammenarbeiten. Immerhin läuft das ReactiveX- Projekt noch. Aber wenn Sie den Dateiinhalt wirklich an den Client senden, sich mit der Codierung befassen und ihn komprimieren müssen, dann ist die Unterstützung in NodeJS vorhanden und funktioniert ziemlich gut.quelle
read()
Methode verwenden, um bei Bedarf aus dem Stream zu lesen. Und dasdrain event
kann signalisieren, dass der beschreibbare Stream mehr Daten empfangen kann.