Ich muss einen Upstream-Dienst (Azure Blob-Dienst) aufrufen, um Daten an einen OutputStream zu senden, den ich dann umdrehen und über akka an den Client zurücksenden muss. Ohne akka (und nur Servlet-Code) würde ich nur den ServletOutputStream erhalten und ihn an die Methode des Azure-Dienstes übergeben.
Das nächste, über das ich stolpern kann, und das ist eindeutig falsch, ist so etwas
Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
blobClient.download(os);
return os;
});
ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);
sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());
Die Idee ist, dass ich einen Upstream-Dienst aufrufe, um einen Ausgabestream zu erhalten, der durch Aufrufen von blobClient.download (os) gefüllt wird.
Es scheint, als würde die Lambda-Funktion aufgerufen und zurückgegeben, aber danach schlägt sie fehl, weil keine Daten oder ähnliches vorhanden sind. Als ob ich nicht hätte, dass diese Lambda-Funktion die Arbeit erledigt, aber vielleicht ein Objekt zurückgibt, das die Arbeit erledigt? Nicht sicher.
Wie macht man das?
quelle
download
? Werden Daten gestreamtos
und erst zurückgegeben, wenn die Daten fertig geschrieben wurden?Antworten:
Das eigentliche Problem hierbei ist, dass die Azure-API nicht für den Gegendruck ausgelegt ist. Der Ausgabestream kann Azure nicht signalisieren, dass er nicht für weitere Daten bereit ist. Anders ausgedrückt: Wenn Azure Daten schneller überträgt, als Sie sie verbrauchen können, muss irgendwo ein hässlicher Pufferüberlauffehler auftreten.
Wenn wir diese Tatsache akzeptieren, ist das nächstbeste, was wir tun können:
Source.lazySource
diese Option , um das Herunterladen von Daten nur zu starten, wenn eine nachgelagerte Anforderung besteht (auch bekannt als die Quelle wird ausgeführt und Daten werden angefordert).download
Aufruf in einen anderen Thread ein, damit er weiterhin ausgeführt wird, ohne die Rückgabe der Quelle zu blockieren. Ein Weg, dies zu tun, ist mit aFuture
(Ich bin nicht sicher, welche Java-Best Practices es gibt, sollte aber in beiden Fällen gut funktionieren). Obwohl dies zunächst keine Rolle spielt, müssen Sie möglicherweise einen anderen Ausführungskontext als auswählensystem.dispatcher
- alles hängt davon ab, obdownload
blockiert wird oder nicht.Ich entschuldige mich im Voraus, wenn dieser Java-Code fehlerhaft ist - ich verwende Akka mit Scala, daher ist dies alles auf die Akka-Java-API und die Java-Syntaxreferenz zurückzuführen.
quelle
Der
OutputStream
in diesem Fall ist der "materialisierte Wert" vonSource
und wird erst erstellt, wenn der Stream ausgeführt wird (oder in einen laufenden Stream "materialisiert" wird). Das Ausführen liegt außerhalb Ihrer Kontrolle, da Sie dasSource
an Akka HTTP übergeben und das später tatsächlich Ihre Quelle ausführt ..mapMaterializedValue(matval -> ...)
wird normalerweise verwendet, um den materialisierten Wert zu transformieren, aber da er als Teil der Materialisierung aufgerufen wird, können Sie damit Nebenwirkungen wie das Senden des Matval in einer Nachricht ausführen, genau wie Sie es herausgefunden haben, es ist nicht unbedingt etwas falsch daran das auch wenn es funky aussieht. Es ist wichtig zu verstehen, dass der Stream seine Materialisierung erst abschließt und läuft, wenn das Lambda abgeschlossen ist. Dies bedeutet Probleme, wenndownload()
blockiert wird, anstatt einige Arbeiten an einem anderen Thread abzubrechen und sofort zurückzukehren.Es gibt jedoch eine andere Lösung:
Source.preMaterialize()
Es materialisiert die Quelle und gibt Ihnen einePair
der materialisierten Werte und eine neueSource
, die verwendet werden kann, um die bereits gestartete Quelle zu verbrauchen:Beachten Sie, dass Ihr Code einige zusätzliche Dinge zu beachten hat, vor allem, wenn der
blobClient.download(os)
Anruf blockiert wird, bis er abgeschlossen ist, und Sie dies vom Schauspieler aus aufrufen. In diesem Fall müssen Sie sicherstellen, dass Ihr Schauspieler den Dispatcher nicht verhungert und anhält andere Akteure in Ihrer Anwendung können nicht ausgeführt werden (siehe Akka-Dokumente: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).quelle