Weiterleiten eines Akka-Streams an einen Upstream-Dienst zum Auffüllen

9

Ich muss einen Upstream-Dienst (Azure Blob-Dienst) aufrufen, um Daten an einen OutputStream zu senden, den ich dann umdrehen und über akka an den Client zurücksenden muss. Ohne akka (und nur Servlet-Code) würde ich nur den ServletOutputStream erhalten und ihn an die Methode des Azure-Dienstes übergeben.

Das nächste, über das ich stolpern kann, und das ist eindeutig falsch, ist so etwas

        Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
            blobClient.download(os);
            return os;
        });

        ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);

        sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());

Die Idee ist, dass ich einen Upstream-Dienst aufrufe, um einen Ausgabestream zu erhalten, der durch Aufrufen von blobClient.download (os) gefüllt wird.

Es scheint, als würde die Lambda-Funktion aufgerufen und zurückgegeben, aber danach schlägt sie fehl, weil keine Daten oder ähnliches vorhanden sind. Als ob ich nicht hätte, dass diese Lambda-Funktion die Arbeit erledigt, aber vielleicht ein Objekt zurückgibt, das die Arbeit erledigt? Nicht sicher.

Wie macht man das?

MeBigFatGuy
quelle
Wie ist das Verhalten von download? Werden Daten gestreamt osund erst zurückgegeben, wenn die Daten fertig geschrieben wurden?
Alec

Antworten:

2

Das eigentliche Problem hierbei ist, dass die Azure-API nicht für den Gegendruck ausgelegt ist. Der Ausgabestream kann Azure nicht signalisieren, dass er nicht für weitere Daten bereit ist. Anders ausgedrückt: Wenn Azure Daten schneller überträgt, als Sie sie verbrauchen können, muss irgendwo ein hässlicher Pufferüberlauffehler auftreten.

Wenn wir diese Tatsache akzeptieren, ist das nächstbeste, was wir tun können:

  • Verwenden Sie Source.lazySourcediese Option , um das Herunterladen von Daten nur zu starten, wenn eine nachgelagerte Anforderung besteht (auch bekannt als die Quelle wird ausgeführt und Daten werden angefordert).
  • Fügen Sie den downloadAufruf in einen anderen Thread ein, damit er weiterhin ausgeführt wird, ohne die Rückgabe der Quelle zu blockieren. Ein Weg, dies zu tun, ist mit a Future(Ich bin nicht sicher, welche Java-Best Practices es gibt, sollte aber in beiden Fällen gut funktionieren). Obwohl dies zunächst keine Rolle spielt, müssen Sie möglicherweise einen anderen Ausführungskontext als auswählen system.dispatcher- alles hängt davon ab, ob downloadblockiert wird oder nicht.

Ich entschuldige mich im Voraus, wenn dieser Java-Code fehlerhaft ist - ich verwende Akka mit Scala, daher ist dies alles auf die Akka-Java-API und die Java-Syntaxreferenz zurückzuführen.

ResponseEntity responseEntity = HttpEntities.create(
  ContentTypes.APPLICATION_OCTET_STREAM,
  preAuthData.getFileSize(),

  // Wait until there is downstream demand to intialize the source...
  Source.lazySource(() -> {
    // Pre-materialize the outputstream before the source starts running
    Pair<OutputStream, Source<ByteString, NotUsed>> pair =
      StreamConverters.asOutputStream().preMaterialize(system);

    // Start writing into the download stream in a separate thread
    Futures.future(() -> { blobClient.download(pair.first()); return pair.first(); }, system.getDispatcher());

    // Return the source - it should start running since `lazySource` indicated demand
    return pair.second();
  })
);

sender().tell(new RequestResult(responseEntity, StatusCodes.OK), self());
Alec
quelle
Fantastisch. Vielen Dank. Eine kleine Änderung an Ihrem Beispiel lautet: Futures.future (() -> {blobClient.download (pair.first ()); return pair.first ();}, system.getDispatcher ());
MeBigFatGuy
@ MeBigFatGuy Richtig, danke!
Alec
1

Der OutputStreamin diesem Fall ist der "materialisierte Wert" von Sourceund wird erst erstellt, wenn der Stream ausgeführt wird (oder in einen laufenden Stream "materialisiert" wird). Das Ausführen liegt außerhalb Ihrer Kontrolle, da Sie das Sourcean Akka HTTP übergeben und das später tatsächlich Ihre Quelle ausführt .

.mapMaterializedValue(matval -> ...)wird normalerweise verwendet, um den materialisierten Wert zu transformieren, aber da er als Teil der Materialisierung aufgerufen wird, können Sie damit Nebenwirkungen wie das Senden des Matval in einer Nachricht ausführen, genau wie Sie es herausgefunden haben, es ist nicht unbedingt etwas falsch daran das auch wenn es funky aussieht. Es ist wichtig zu verstehen, dass der Stream seine Materialisierung erst abschließt und läuft, wenn das Lambda abgeschlossen ist. Dies bedeutet Probleme, wenn download()blockiert wird, anstatt einige Arbeiten an einem anderen Thread abzubrechen und sofort zurückzukehren.

Es gibt jedoch eine andere Lösung: Source.preMaterialize()Es materialisiert die Quelle und gibt Ihnen einePair der materialisierten Werte und eine neue Source, die verwendet werden kann, um die bereits gestartete Quelle zu verbrauchen:

Pair<OutputStream, Source<ByteString, NotUsed>> pair = 
  StreamConverters.asOutputStream().preMaterialize(system);
OutputStream os = pair.first();
Source<ByteString, NotUsed> source = pair.second();

Beachten Sie, dass Ihr Code einige zusätzliche Dinge zu beachten hat, vor allem, wenn der blobClient.download(os)Anruf blockiert wird, bis er abgeschlossen ist, und Sie dies vom Schauspieler aus aufrufen. In diesem Fall müssen Sie sicherstellen, dass Ihr Schauspieler den Dispatcher nicht verhungert und anhält andere Akteure in Ihrer Anwendung können nicht ausgeführt werden (siehe Akka-Dokumente: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).

Johanandren
quelle
1
Danke für die Antwort. Ich sehe nicht, wie das möglicherweise funktionieren könnte? Wohin gehen die Bytes, wenn blobClient.download (os) aufgerufen wird (wenn ich es selbst aufrufe)? Stellen Sie sich vor, ein Terabyte Daten wartet darauf, geschrieben zu werden. Es scheint mir, dass der Aufruf von blobClient.download vom Aufruf sender.tell aufgerufen werden muss, damit dies im Grunde eine IOUtils.copy-ähnliche Operation ist. Mit preMaterialize kann ich nicht sehen, wie das passiert?
MeBigFatGuy
Der OutputStream verfügt über einen internen Puffer. Er akzeptiert Schreibvorgänge, bis dieser Puffer voll ist. Wenn der asynchrone Downstream bis dahin keine Elemente verbraucht hat, blockiert er den Schreibthread (weshalb ich erwähnt habe, dass es wichtig ist, das Blockieren zu handhaben).
Johanandren
1
Aber wenn ich vormaterialisiere und den OutputStream bekomme, dann ist es mein Code, der den blobClient.download (os) ausführt; richtig? Das heißt, es muss abgeschlossen sein, bevor ich fortfahren kann, was unmöglich ist.
MeBigFatGuy
Wenn download (os) einen Thread nicht verzweigt, müssen Sie damit umgehen, dass er blockiert, und sicherstellen, dass kein anderer Vorgang gestoppt wird. Eine Möglichkeit wäre, einen Thread zu teilen, um die Arbeit zu erledigen, eine andere wäre, zuerst vom Schauspieler zu antworten und dann dort die Blockierungsarbeit zu erledigen. In diesem Fall müssen Sie sicherstellen, dass der Schauspieler keine anderen Schauspieler verhungert, siehe den Link am Ende von meine Antwort.
Johanandren
An diesem Punkt versuche ich nur, es überhaupt zum Laufen zu bringen. Es kann nicht einmal eine 10-Byte-Datei verarbeiten.
MeBigFatGuy