Verbinden eines Eingabestreams mit einem Ausgabestream

76

Update in Java9: https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-

Ich habe einige ähnliche, aber nicht ganz das gesehen, was ich brauche.

Ich habe einen Server, der im Grunde Eingaben von einem Client, Client A, nimmt und diese Byte für Byte an einen anderen Client, Client B, weiterleitet.

Ich möchte meinen Eingabestream von Client A mit meinem Ausgabestream von Client B verbinden. Ist das möglich? Wie geht das?

Außerdem senden sich diese Clients gegenseitig Nachrichten, die etwas zeitkritisch sind, sodass das Puffern nicht ausreicht. Ich möchte keinen Puffer von beispielsweise 500 und ein Client sendet 499 Bytes. Dann hält mein Server die Weiterleitung der 500 Bytes zurück, da er nicht das letzte Byte empfangen hat, das den Puffer füllt.

Im Moment analysiere ich jede Nachricht, um ihre Länge zu ermitteln, lese dann Längenbytes und leite sie weiter. Ich dachte (und testete), dass dies besser wäre, als ein Byte zu lesen und ein Byte immer wieder weiterzuleiten, da dies sehr langsam wäre. Ich wollte aus dem Grund, den ich in meinem letzten Absatz angegeben habe, auch keinen Puffer oder Timer verwenden - ich möchte nicht, dass Nachrichten, die wirklich lange warten, durchkommen, nur weil der Puffer nicht voll ist.

Was ist ein guter Weg, um dies zu tun?

jbu
quelle

Antworten:

86

Nur weil Sie einen Puffer verwenden, bedeutet dies nicht, dass der Stream diesen Puffer füllen muss . Mit anderen Worten, das sollte in Ordnung sein:

public static void copyStream(InputStream input, OutputStream output)
    throws IOException
{
    byte[] buffer = new byte[1024]; // Adjust if you want
    int bytesRead;
    while ((bytesRead = input.read(buffer)) != -1)
    {
        output.write(buffer, 0, bytesRead);
    }
}

Das sollte gut funktionieren - im Grunde wird der readAnruf blockiert, bis einige Daten verfügbar sind, aber er wird nicht warten, bis alle verfügbar sind, um den Puffer zu füllen. (Ich nehme an, es konnte, und ich glaube , FileInputStreamin der Regel wird der Puffer füllen, sondern ein Strom an eine Steckdose befestigt ist wahrscheinlicher , Sie sofort die Daten zu geben.)

Ich denke, es lohnt sich, zuerst diese einfache Lösung auszuprobieren.

Jon Skeet
quelle
Ja, ich denke das klärt die Dinge auf. Ich glaube, ich wurde mit readFully () verwechselt, für das der Puffer gefüllt werden muss.
jbu
1
Ich habe Ihren Code ausprobiert und ich habe auch versucht, Nachricht für Nachricht zu lesen, indem ich die Länge der Nachricht gelesen und dann ein Byte [] buf = length; inputstream.read (buf) .... die letztere Methode war schneller und ich bin mir nicht sicher warum. Es scheint mehr Codezeilen auszuführen, ist aber schneller. Fast 2x so schnell.
jbu
2
@Zibbobz: Jede Array-Größe funktioniert - je größer sie ist, desto weniger Lesevorgänge werden benötigt, aber desto mehr Speicher wird benötigt, während sie funktioniert. Es ist nicht so, dass es die tatsächliche Länge des Streams sein muss.
Jon Skeet
1
@sgibly: Nun, da ein close()Wille es trotzdem spült , denke ich nicht, dass es sich persönlich lohnt. Wenn Sie Code wie diesen nehmen, können Sie ihn natürlich gerne hinzufügen :)
Jon Skeet
1
@sgibly: Ich würde sagen, dass es eher schlecht dokumentiert ist als die Absicht , dass jeder Flush anrufen muss ...
Jon Skeet
77

Wie wäre es einfach mit

void feedInputToOutput(InputStream in, OutputStream out) {
   IOUtils.copy(in, out);
}

und damit fertig sein?

aus der jakarta apache commons i / o Bibliothek, die bereits von einer Vielzahl von Projekten verwendet wird, sodass Sie das Glas wahrscheinlich bereits in Ihrem Klassenpfad haben.

Dean Hiller
quelle
23
oder verwenden Sie einfach die Funktion selbst, da das Aufrufen einer anderen Funktion mit genau den gleichen Parametern nicht erforderlich ist ....
Android-Entwickler
Ja, das mache ich persönlich. Ich glaube, ich habe nur den zusätzlichen Methodennamen als Dokumentation eingegeben, aber er wird nicht benötigt.
Dean Hiller
1
Soweit ich das beurteilen kann, blockiert diese Methode, bis die while-Eingabe durchlaufen wird. Daher sollte dies in einem asynchronen Thread für den Fragesteller erfolgen.
Jonah
10

Sie können einen Ringpuffer verwenden:

Code

// buffer all data in a circular buffer of infinite size
CircularByteBuffer cbb = new CircularByteBuffer(CircularByteBuffer.INFINITE_SIZE);
class1.putDataOnOutputStream(cbb.getOutputStream());
class2.processDataFromInputStream(cbb.getInputStream());


Maven-Abhängigkeit

<dependency>
    <groupId>org.ostermiller</groupId>
    <artifactId>utils</artifactId>
    <version>1.07.00</version>
</dependency>


Modendetails

http://ostermiller.org/utils/CircularBuffer.html

Stephan
quelle
8

Asynchroner Weg, um dies zu erreichen.

void inputStreamToOutputStream(final InputStream inputStream, final OutputStream out) {
    Thread t = new Thread(new Runnable() {

        public void run() {
            try {
                int d;
                while ((d = inputStream.read()) != -1) {
                    out.write(d);
                }
            } catch (IOException ex) {
                //TODO make a callback on exception.
            }
        }
    });
    t.setDaemon(true);
    t.start();
}
Daniel De León
quelle
Dies dient zum Übertragen von Daten von einem Stream zu einem anderen, ohne den aktuellen Thread zu blockieren.
Daniel De León
3

BUFFER_SIZE ist die Größe der einzulesenden Spannfutter. Sollte> 1 KB und <10 MB sein.

private static final int BUFFER_SIZE = 2 * 1024 * 1024;
private void copy(InputStream input, OutputStream output) throws IOException {
    try {
        byte[] buffer = new byte[BUFFER_SIZE];
        int bytesRead = input.read(buffer);
        while (bytesRead != -1) {
            output.write(buffer, 0, bytesRead);
            bytesRead = input.read(buffer);
        }
    //If needed, close streams.
    } finally {
        input.close();
        output.close();
    }
}
Leandro Latorre
quelle
2
Sollte viel weniger als 10 MB sein. Dies ist TCP, über das wir sprechen. Jede Größe, die größer als der Socket-Empfangspuffer ist, ist völlig sinnlos und wird in Kilobyte und nicht in Megabyte gemessen.
user207421
2

Verwenden Sie org.apache.commons.io.IOUtils

InputStream inStream = new ...
OutputStream outStream = new ...
IOUtils.copy(inStream, outStream);

oder copyLarge für Größe> 2 GB

Adam111p
quelle
1

Dies ist eine Scala-Version, die sauber und schnell ist (kein Stackoverflow):

  import scala.annotation.tailrec
  import java.io._

  implicit class InputStreamOps(in: InputStream) {
    def >(out: OutputStream): Unit = pipeTo(out)

    def pipeTo(out: OutputStream, bufferSize: Int = 1<<10): Unit = pipeTo(out, Array.ofDim[Byte](bufferSize))

    @tailrec final def pipeTo(out: OutputStream, buffer: Array[Byte]): Unit = in.read(buffer) match {
      case n if n > 0 =>
        out.write(buffer, 0, n)
        pipeTo(out, buffer)
      case _ =>
        in.close()
        out.close()
    }
  }

Dies ermöglicht die Verwendung von >Symbolen zB inputstream > outputstreamund die Übergabe von benutzerdefinierten Puffern / Größen.

Pathikrit
quelle
Könnten Sie eine ähnliche Java-Implementierung bereitstellen?
Luchostein
1
@ Luchostein: Ich antwortete auf die fehlerhafte Scala-Antwort von George Pligor unten
Pathikrit
-16

Wenn Sie sich für Funktionen interessieren, ist dies eine in Scala geschriebene Funktion, die zeigt, wie Sie einen Eingabestream nur mit vals (und nicht mit vars) in einen Ausgabestream kopieren können.

def copyInputToOutputFunctional(inputStream: InputStream, outputStream: OutputStream,bufferSize: Int) {
  val buffer = new Array[Byte](bufferSize);
  def recurse() {
    val len = inputStream.read(buffer);
    if (len > 0) {
      outputStream.write(buffer.take(len));
      recurse();
    }
  }
  recurse();
}

Beachten Sie, dass dies nicht für die Verwendung in einer Java-Anwendung mit wenig verfügbarem Speicher empfohlen wird, da bei einer rekursiven Funktion leicht ein Stapelüberlauf-Ausnahmefehler auftreten kann

George Pligoropoulos
quelle
12
-1: Wie ist eine rekursive Scala-Lösung für eine Java-Frage relevant?
Tomlogic
2
Die Methodenrekursion ist schwanzrekursiv. Wenn Sie es mit @tailrec anotieren, treten keine Stapelüberlaufprobleme auf.
Simão Martins
1
Diese Antwort bestätigt, dass alle reinen Java-Programmierer unter dem Druck ihrer Chefs leiden und ernsthaftes Wutmanagement benötigen!
George Pligoropoulos