Node.js Leitet denselben lesbaren Stream in mehrere (beschreibbare) Ziele

76

Ich muss zwei Befehle in Reihe ausführen, die Daten aus demselben Stream lesen müssen. Nach dem Weiterleiten eines Streams in einen anderen wird der Puffer geleert, sodass ich keine Daten mehr aus diesem Stream lesen kann, sodass dies nicht funktioniert:

var spawn = require('child_process').spawn;
var fs = require('fs');
var request = require('request');

var inputStream = request('http://placehold.it/640x360');
var identify = spawn('identify',['-']);

inputStream.pipe(identify.stdin);

var chunks = [];
identify.stdout.on('data',function(chunk) {
  chunks.push(chunk);
});

identify.stdout.on('end',function() {
  var size = getSize(Buffer.concat(chunks)); //width
  var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);
  inputStream.pipe(convert.stdin);
  convert.stdout.pipe(fs.createWriteStream('half.png'));
});

function getSize(buffer){
  return parseInt(buffer.toString().split(' ')[2].split('x')[0]);
}

Anfrage beschwert sich darüber

Error: You cannot pipe after data has been emitted from the response.

und Ändern des inputStream , fs.createWriteStreamum natürlich das gleiche Problem zu erhalten. Ich möchte nicht in eine Datei schreiben, sondern den von dieser Anforderung erzeugten Stream (oder einen anderen) in irgendeiner Weise wiederverwenden .

Gibt es eine Möglichkeit, einen lesbaren Stream nach Abschluss der Rohrleitungen wiederzuverwenden? Was wäre der beste Weg, um so etwas wie das obige Beispiel zu erreichen?

Maroshii
quelle
Scheint, als würden Sie Imagemick verwenden. Sie können einen Wert wie 50% für die Skalierung an -scale übergeben. Sie können auch npmjs.org/package/gm
user568109
2
@ user568109 Ja. Das ist hier jedoch nicht das Problem. Es ist eine allgemeinere Frage ... es ist imaginär, wie es jeder andere Befehl / Stream sein könnte
Maroshii

Antworten:

83

Sie müssen ein Duplikat des Streams erstellen, indem Sie es an zwei Streams weiterleiten. Sie können einen einfachen Stream mit einem PassThrough-Stream erstellen. Er übergibt einfach die Eingabe an die Ausgabe.

const spawn = require('child_process').spawn;
const PassThrough = require('stream').PassThrough;

const a = spawn('echo', ['hi user']);
const b = new PassThrough();
const c = new PassThrough();

a.stdout.pipe(b);
a.stdout.pipe(c);

let count = 0;
b.on('data', function (chunk) {
  count += chunk.length;
});
b.on('end', function () {
  console.log(count);
  c.pipe(process.stdout);
});

Ausgabe:

8
hi user
user568109
quelle
5
Verwendete diese Technik mit den Haraka-Mailserver-Anhangs-Hooks, um den eingehenden Stream in mehrere E-Mail-Kontodatenbanken zu leiten. Diese Antwort funktioniert.
17
Beachten Sie, dass diese Technik nur funktioniert, wenn der erzeugte Befehl eine Anzahl von Bytes ausgibt, die die Gegendruckpuffer nicht füllen. Sie können versuchen, es mit a = spawn ('head', ['-c', '200K', '/ dev / urandom']) zum Scheitern zu bringen. Wenn c nicht herausgeführt wird, unterbricht a.stdout irgendwann das Herausleiten. b wird abfließen und niemals enden.
Jerome WAGNER
44
Ich bin verwirrt, Sie sagen, dass Sie denselben Stream nicht zweimal verarbeiten können, aber Ihre Lösung besteht darin, denselben Stream zweimal zu verarbeiten (mit der PassThrough-Transformation). Dies scheint widersprüchlich. Ist das etwas Besonderes an den Standard-Streams?
BT
7
Ich habe das getestet und es funktioniert auf jeden Fall. Ich denke, es ist nicht richtig, wenn Sie sagen "Sie können denselben Stream nicht zweimal verarbeiten", da Sie dies tun. Ihre ersten Aussagen darüber, dass ein Stream nach seinem "Ende" nicht weitergeleitet werden kann, sind der richtige Grund.
BT
6
Verwenden Sie diese Methode nicht, da dies zu Problemen führt, wenn die Streams mit unterschiedlichen Raten gelesen werden. Versuchen Sie dies stattdessen. Npmjs.com/package/readable-stream-clone hat bei mir gut funktioniert.
Kiwicomb123
12

Die erste Antwort funktioniert nur, wenn Streams ungefähr genauso viel Zeit für die Datenverarbeitung benötigen. Wenn man wesentlich länger braucht, fordert der Schnellere neue Daten an und überschreibt folglich die Daten, die noch von dem Langsameren verwendet werden (ich hatte dieses Problem, nachdem ich versucht hatte, es mit einem doppelten Stream zu lösen).

Das folgende Muster hat bei mir sehr gut funktioniert. Es verwendet eine Bibliothek, die auf Stream2-Streams, Streamz und Promises basiert, um asynchrone Streams über einen Rückruf zu synchronisieren. Verwenden Sie das bekannte Beispiel aus der ersten Antwort:

spawn = require('child_process').spawn;
pass = require('stream').PassThrough;
streamz = require('streamz').PassThrough;
var Promise = require('bluebird');

a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;   

a.stdout.pipe(streamz(combineStreamOperations)); 

function combineStreamOperations(data, next){
  Promise.join(b, c, function(b, c){ //perform n operations on the same data
  next(); //request more
}

count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });
Artikas
quelle
Welcher Teil überschreibt tatsächlich die Daten? Der Code, der überschreibt, sollte natürlich einen Fehler auslösen.
Robert Siemer
2

Was ist mit der gleichzeitigen Leitung in zwei oder mehr Streams?

Zum Beispiel :

var PassThrough = require('stream').PassThrough;
var mybiraryStream = stream.start(); //never ending audio stream
var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'})
var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'})
var mypass = PassThrough
mybinaryStream.pipe(mypass)
mypass.pipe(file1)
setTimeout(function(){
   mypass.pipe(file2);
},2000)

Der obige Code erzeugt keine Fehler, aber die Datei2 ist leer

user3683370
quelle
irgendwie hilft es mir!
Sandip
5
Ich denke, Sie haben ein Problem identifiziert, aber es ist verwirrend, weil dies keine Antwort ist.
Michael
1

Bei allgemeinen Problemen funktioniert der folgende Code einwandfrei

var PassThrough = require('stream').PassThrough
a=PassThrough()
b1=PassThrough()
b2=PassThrough()
a.pipe(b1)
a.pipe(b2)
b1.on('data', function(data) {
  console.log('b1:', data.toString())
})
b2.on('data', function(data) {
  console.log('b2:', data.toString())
})
a.write('text')
Jake
quelle
1

Ich habe eine andere Lösung, um gleichzeitig in zwei Streams zu schreiben. Die Zeit zum Schreiben ist natürlich die Addition der beiden Male, aber ich verwende sie, um auf eine Download-Anfrage zu antworten, bei der ich eine Kopie der heruntergeladenen Datei behalten möchte mein Server (eigentlich verwende ich eine S3-Sicherung, also speichere ich die am häufigsten verwendeten Dateien lokal zwischen, um mehrere Dateiübertragungen zu vermeiden)

/**
 * A utility class made to write to a file while answering a file download request
 */
class TwoOutputStreams {
  constructor(streamOne, streamTwo) {
    this.streamOne = streamOne
    this.streamTwo = streamTwo
  }

  setHeader(header, value) {
    if (this.streamOne.setHeader)
      this.streamOne.setHeader(header, value)
    if (this.streamTwo.setHeader)
      this.streamTwo.setHeader(header, value)
  }

  write(chunk) {
    this.streamOne.write(chunk)
    this.streamTwo.write(chunk)
  }

  end() {
    this.streamOne.end()
    this.streamTwo.end()
  }
}

Sie können dies dann als regulären OutputStream verwenden

const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream)

und übergeben Sie es an Ihre Methode, als wäre es eine Antwort oder ein fileOutputStream

Zied Hamdi
quelle
1

Wenn Sie asynchrone Vorgänge für die PassThrough-Streams ausführen, funktionieren die hier veröffentlichten Antworten nicht. Eine Lösung, die für asynchrone Vorgänge funktioniert, umfasst das Puffern des Stream-Inhalts und das anschließende Erstellen von Streams aus dem gepufferten Ergebnis.

  1. Um das Ergebnis zu puffern, können Sie concat-stream verwenden

    const Promise = require('bluebird');
    const concat = require('concat-stream');
    const getBuffer = function(stream){
        return new Promise(function(resolve, reject){
            var gotBuffer = function(buffer){
                resolve(buffer);
            }
            var concatStream = concat(gotBuffer);
            stream.on('error', reject);
            stream.pipe(concatStream);
        });
    }
    
  2. So erstellen Sie Streams aus dem Puffer:

    const { Readable } = require('stream');
    const getBufferStream = function(buffer){
        const stream = new Readable();
        stream.push(buffer);
        stream.push(null);
        return Promise.resolve(stream);
    }
    
Juan
quelle
1

Sie können dieses kleine npm-Paket verwenden, das ich erstellt habe:

readable-stream-clone

Damit können Sie lesbare Streams beliebig oft wiederverwenden

levansuper
quelle