Die Akka Streams-Bibliothek enthält bereits eine Fülle von Dokumentationen . Das Hauptproblem für mich ist jedoch, dass es zu viel Material liefert - ich fühle mich ziemlich überwältigt von der Anzahl der Konzepte, die ich lernen muss. Viele der dort gezeigten Beispiele fühlen sich sehr schwer an und können nicht einfach in reale Anwendungsfälle übersetzt werden. Sie sind daher ziemlich esoterisch. Ich denke, es gibt viel zu viele Details, ohne zu erklären, wie alle Bausteine zusammengesetzt werden und wie genau es hilft, bestimmte Probleme zu lösen.
Es gibt Quellen, Senken, Flüsse, Graphstufen, Teilgraphen, Materialisierung, ein Graph-DSL und vieles mehr, und ich weiß einfach nicht, wo ich anfangen soll. Die Kurzanleitung soll als Ausgangspunkt dienen, aber ich verstehe sie nicht. Es werden nur die oben genannten Konzepte eingefügt, ohne sie zu erklären. Außerdem können die Codebeispiele nicht ausgeführt werden - es fehlen Teile, die es mir mehr oder weniger unmöglich machen, dem Text zu folgen.
Kann jemand die Konzepte Quellen, Senken, Flüsse, Graphstufen, Teilgraphen, Materialisierung und vielleicht einige andere Dinge erklären, die ich in einfachen Worten und mit einfachen Beispielen vermisst habe, die nicht jedes einzelne Detail erklären (und die wahrscheinlich sowieso nicht benötigt werden) der Anfang)?
quelle
Antworten:
Diese Antwort basiert auf der
akka-stream
Version2.4.2
. Die API kann in anderen Versionen leicht abweichen. Die Abhängigkeit kann von sbt genutzt werden :Okay, lass uns anfangen. Die API von Akka Streams besteht aus drei Haupttypen. Im Gegensatz zu Reactive Streams sind diese Typen viel leistungsfähiger und daher komplexer. Es wird davon ausgegangen, dass für alle Codebeispiele bereits folgende Definitionen existieren:
Die
import
Anweisungen werden für die Typdeklarationen benötigt.system
repräsentiert das Akteursystem von Akka undmaterializer
repräsentiert den Bewertungskontext des Streams. In unserem Fall verwenden wir aActorMaterializer
, was bedeutet, dass die Streams über den Akteuren ausgewertet werden. Beide Werte sind als gekennzeichnetimplicit
, wodurch der Scala-Compiler die Möglichkeit hat, diese beiden Abhängigkeiten bei Bedarf automatisch einzufügen. Wir importieren auchsystem.dispatcher
, was ein Ausführungskontext für istFutures
.Eine neue API
Akka Streams haben folgende Schlüsseleigenschaften:
Materializer
.Source
,Sink
undFlow
. Die Bausteine bilden ein Diagramm, dessen Auswertung auf dem basiertMaterializer
und explizit ausgelöst werden muss.Im Folgenden wird eine tiefere Einführung in die Verwendung der drei Haupttypen gegeben.
Quelle
A
Source
ist ein Datenersteller und dient als Eingabequelle für den Stream. JederSource
hat einen einzelnen Ausgangskanal und keinen Eingangskanal. Alle Daten fließen über den Ausgangskanal zu dem, was an das angeschlossen istSource
.Bild genommen von boldradius.com .
A
Source
kann auf verschiedene Arten erstellt werden:In den oben genannten Fällen haben wir die
Source
mit endlichen Daten gespeist , was bedeutet, dass sie irgendwann enden werden. Man sollte nicht vergessen, dass Reactive Streams standardmäßig faul und asynchron sind. Dies bedeutet, dass man explizit die Auswertung des Streams anfordern muss. In Akka Streams kann dies über dierun*
Methoden erfolgen. DasrunForeach
wäre nicht anders als die bekannteforeach
Funktion - durch dierun
Hinzufügung wird deutlich, dass wir um eine Auswertung des Streams bitten. Da endliche Daten langweilig sind, fahren wir mit unendlich fort:Mit der
take
Methode können wir einen künstlichen Stopppunkt erstellen, der uns daran hindert, unbegrenzt zu bewerten. Da die Schauspielerunterstützung integriert ist, können wir den Stream auch problemlos mit Nachrichten versorgen, die an einen Schauspieler gesendet werden:Wir können sehen, dass die
Futures
asynchron auf verschiedenen Threads ausgeführt werden, was das Ergebnis erklärt. Im obigen Beispiel ist ein Puffer für die eingehenden Elemente nicht erforderlich. DaherOverflowStrategy.fail
können wir mit konfigurieren, dass der Stream bei einem Pufferüberlauf fehlschlagen soll. Insbesondere über diese Akteurschnittstelle können wir den Stream durch jede Datenquelle speisen. Es spielt keine Rolle, ob die Daten von demselben Thread, von einem anderen, von einem anderen Prozess erstellt werden oder ob sie von einem Remote-System über das Internet stammen.Sinken
A
Sink
ist im Grunde das Gegenteil von aSource
. Es ist der Endpunkt eines Streams und verbraucht daher Daten. ASink
hat einen einzelnen Eingangskanal und keinen Ausgangskanal.Sinks
werden insbesondere benötigt, wenn das Verhalten des Datenkollektors wiederverwendbar und ohne Auswertung des Streams angegeben werden soll. Die bereits bekanntenrun*
Methoden erlauben uns diese Eigenschaften nicht, daher wird sie bevorzugt verwendetSink
.Bild genommen von boldradius.com .
Ein kurzes Beispiel für eine
Sink
Aktion:Das Verbinden von a
Source
mit aSink
kann mit derto
Methode erfolgen. Es gibt einen sogenannten Stream zurück,RunnableFlow
wie wir später sehen werden, eine spezielle Form von aFlow
- einen Stream, der durch einfaches Aufrufen seinerrun()
Methode ausgeführt werden kann .Bild genommen von boldradius.com .
Es ist natürlich möglich, alle Werte, die zu einer Senke kommen, an einen Schauspieler weiterzuleiten:
Fließen
Datenquellen und -senken sind großartig, wenn Sie eine Verbindung zwischen Akka-Streams und einem vorhandenen System benötigen, aber mit ihnen nichts wirklich anfangen können. Flows sind das letzte fehlende Teil in der Basisabstraktion von Akka Streams. Sie fungieren als Konnektor zwischen verschiedenen Streams und können zur Transformation ihrer Elemente verwendet werden.
Bild genommen von boldradius.com .
Wenn a
Flow
mit a verbunden ist, istSource
ein neuesSource
das Ergebnis. Ebenso schafft einFlow
mit einem verbundenesSink
ein neuesSink
. Und aFlow
verbunden mit aSource
und aSink
ergibt aRunnableFlow
. Daher befinden sie sich zwischen dem Eingangs- und dem Ausgangskanal, entsprechen jedoch nicht einer der Geschmacksrichtungen, solange sie weder mit aSource
noch mit a verbunden sindSink
.Bild genommen von boldradius.com .
Um dies besser zu verstehen
Flows
, werden wir uns einige Beispiele ansehen:Über die
via
Methode können wir aSource
mit a verbindenFlow
. Wir müssen den Eingabetyp angeben, da der Compiler ihn für uns nicht ableiten kann. Wie wir bereits in diesem einfachen Beispiel sehen können, fließen die Dateninvert
unddouble
sind völlig unabhängig von Datenproduzenten und -konsumenten. Sie transformieren nur die Daten und leiten sie an den Ausgangskanal weiter. Dies bedeutet, dass wir einen Fluss zwischen mehreren Streams wiederverwenden können:s1
unds2
stellen völlig neue Streams dar - sie teilen keine Daten über ihre Bausteine.Ungebundene Datenströme
Bevor wir fortfahren, sollten wir zunächst einige der wichtigsten Aspekte von Reactive Streams erneut betrachten. Eine unbegrenzte Anzahl von Elementen kann an jedem Punkt ankommen und einen Stream in verschiedene Zustände versetzen. Abgesehen von einem ausführbaren Stream, der der übliche Zustand ist, kann ein Stream entweder durch einen Fehler oder durch ein Signal gestoppt werden, das angibt, dass keine weiteren Daten ankommen. Ein Stream kann grafisch modelliert werden, indem Ereignisse auf einer Zeitachse markiert werden, wie dies hier der Fall ist:
Bild aus Die Einführung in die reaktive Programmierung, die Sie vermisst haben .
In den Beispielen des vorherigen Abschnitts haben wir bereits lauffähige Flows gesehen. Wir erhalten ein,
RunnableGraph
wann immer ein Stream tatsächlich materialisiert werden kann, was bedeutet, dass aSink
mit a verbunden istSource
. Bisher haben wir uns immer auf den Wert materialisiert, derUnit
in den Typen zu sehen ist:Für
Source
undSink
der zweite Typparameter und fürFlow
den dritten Typparameter bezeichnen den materialisierten Wert. In dieser Antwort wird die volle Bedeutung der Materialisierung nicht erklärt. Weitere Details zur Materialisierung finden Sie jedoch in der offiziellen Dokumentation . Im Moment müssen wir nur wissen, dass der materialisierte Wert das ist, was wir erhalten, wenn wir einen Stream ausführen. Da wir bisher nur an Nebenwirkungen interessiert waren, erhielten wirUnit
als materialisierten Wert. Die Ausnahme war die Materialisierung einer Spüle, die zu einerFuture
. Es gab uns ein zurückFuture
, da dieser Wert angeben kann, wann der mit der Senke verbundene Stream beendet wurde. Bisher waren die vorherigen Codebeispiele nett, um das Konzept zu erklären, aber sie waren auch langweilig, weil wir uns nur mit endlichen oder sehr einfachen unendlichen Strömen befassten. Um es interessanter zu machen, wird im Folgenden ein vollständiger asynchroner und unbegrenzter Stream erläutert.ClickStream-Beispiel
Als Beispiel möchten wir einen Stream haben, der Klickereignisse erfasst. Nehmen wir an, wir möchten auch Klickereignisse gruppieren, die kurz nacheinander auftreten. Auf diese Weise konnten wir leicht doppelte, dreifache oder zehnfache Klicks feststellen. Außerdem wollen wir alle Einzelklicks herausfiltern. Atmen Sie tief ein und stellen Sie sich vor, wie Sie dieses Problem unbedingt lösen würden. Ich wette, niemand könnte eine Lösung implementieren, die beim ersten Versuch korrekt funktioniert. In reaktiver Weise ist dieses Problem trivial zu lösen. Tatsächlich ist die Lösung so einfach und unkompliziert zu implementieren, dass wir sie sogar in einem Diagramm ausdrücken können, das das Verhalten des Codes direkt beschreibt:
Bild aus Die Einführung in die reaktive Programmierung, die Sie vermisst haben .
Die grauen Kästchen sind Funktionen, die beschreiben, wie ein Stream in einen anderen umgewandelt wird. Mit der
throttle
Funktion akkumulieren wir Klicks innerhalb von 250 Millisekunden. Die Funktionenmap
undfilter
sollten selbsterklärend sein. Die Farbkugeln stellen ein Ereignis dar und die Pfeile zeigen, wie sie durch unsere Funktionen fließen. Später in den Verarbeitungsschritten erhalten wir immer weniger Elemente, die durch unseren Stream fließen, da wir sie zusammenfassen und herausfiltern. Der Code für dieses Bild würde ungefähr so aussehen:Die gesamte Logik kann in nur vier Codezeilen dargestellt werden! In Scala könnten wir es noch kürzer schreiben:
Die Definition von
clickStream
ist etwas komplexer, aber dies ist nur der Fall, weil das Beispielprogramm auf der JVM ausgeführt wird, wo das Erfassen von Klickereignissen nicht einfach möglich ist. Eine weitere Komplikation ist, dass Akka diethrottle
Funktion standardmäßig nicht bereitstellt . Stattdessen mussten wir es selbst schreiben. Da diese Funktion ist (wie es für dasmap
oder das der Fall istfilter
Funktionen ) für verschiedene Anwendungsfälle wiederverwendbar ist, zähle ich diese Zeilen nicht zur Anzahl der Zeilen, die wir zur Implementierung der Logik benötigen. In imperativen Sprachen ist es jedoch normal, dass Logik nicht so einfach wiederverwendet werden kann und dass die verschiedenen logischen Schritte alle an einem Ort stattfinden, anstatt nacheinander angewendet zu werden, was bedeutet, dass wir unseren Code wahrscheinlich mit der Drossellogik falsch geformt hätten. Das vollständige Codebeispiel ist als verfügbarKern und wird hier nicht weiter diskutiert.SimpleWebServer Beispiel
Was stattdessen diskutiert werden sollte, ist ein weiteres Beispiel. Während der Klick-Stream ein gutes Beispiel dafür ist, wie Akka Streams ein Beispiel aus der realen Welt verarbeiten kann, fehlt ihm die Fähigkeit, die parallele Ausführung in Aktion zu zeigen. Das nächste Beispiel soll einen kleinen Webserver darstellen, der mehrere Anforderungen parallel verarbeiten kann. Der Web-Server muss in der Lage sein, eingehende Verbindungen zu akzeptieren und Byte-Sequenzen von diesen zu empfangen, die druckbare ASCII-Zeichen darstellen. Diese Bytefolgen oder Zeichenfolgen sollten bei allen Zeilenumbrüchen in kleinere Teile aufgeteilt werden. Danach antwortet der Server dem Client mit jeder der geteilten Leitungen. Alternativ könnte es etwas anderes mit den Zeilen tun und ein spezielles Antwort-Token geben, aber wir möchten es in diesem Beispiel einfach halten und daher keine ausgefallenen Funktionen einführen. Merken, Der Server muss in der Lage sein, mehrere Anforderungen gleichzeitig zu verarbeiten. Dies bedeutet im Grunde, dass keine Anforderung andere Anforderungen für die weitere Ausführung blockieren darf. Das Lösen all dieser Anforderungen kann auf zwingende Weise schwierig sein - mit Akka Streams sollten wir jedoch nicht mehr als ein paar Zeilen benötigen, um diese zu lösen. Lassen Sie uns zunächst einen Überblick über den Server selbst geben:
Grundsätzlich gibt es nur drei Hauptbausteine. Der erste muss eingehende Verbindungen akzeptieren. Der zweite muss eingehende Anfragen bearbeiten und der dritte muss eine Antwort senden. Die Implementierung all dieser drei Bausteine ist nur wenig komplizierter als die Implementierung des Klick-Streams:
Die Funktion verwendet
mkServer
(neben der Adresse und dem Port des Servers) auch ein Akteursystem und einen Materialisierer als implizite Parameter. Der Kontrollfluss des Servers wird durch dargestelltbinding
, der eine Quelle eingehender Verbindungen nimmt und diese an eine Senke eingehender Verbindungen weiterleitet. InnerhalbconnectionHandler
unserer Spüle behandeln wir jede Verbindung durch den FlussserverLogic
, der später beschrieben wird.binding
gibt a zurückFuture
Dies wird abgeschlossen, wenn der Server gestartet wurde oder der Start fehlgeschlagen ist. Dies kann der Fall sein, wenn der Port bereits von einem anderen Prozess belegt wird. Der Code spiegelt jedoch die Grafik nicht vollständig wider, da wir keinen Baustein sehen können, der Antworten verarbeitet. Der Grund dafür ist, dass die Verbindung diese Logik bereits selbst bereitstellt. Es ist ein bidirektionaler Fluss und nicht nur ein unidirektionaler Fluss, wie wir ihn in den vorherigen Beispielen gesehen haben. Wie bei der Materialisierung sollen solche komplexen Strömungen hier nicht erklärt werden. Die offizielle Dokumentation enthält reichlich Material für komplexere Flussdiagramme. Im Moment reicht es aus, diesen zu kennenTcp.IncomingConnection
eine Verbindung darstellt, die weiß, wie man Anfragen empfängt und wie man Antworten sendet. Der Teil, der noch fehlt, ist derserverLogic
Baustein. Es kann so aussehen:Wieder einmal können wir die Logik in mehrere einfache Bausteine aufteilen, die alle zusammen den Ablauf unseres Programms bilden. Zuerst wollen wir unsere Bytesequenz in Zeilen aufteilen, was wir tun müssen, wenn wir ein Zeilenumbruchzeichen finden. Danach müssen die Bytes jeder Zeile in eine Zeichenfolge konvertiert werden, da das Arbeiten mit Rohbytes umständlich ist. Insgesamt könnten wir einen Binärstrom eines komplizierten Protokolls erhalten, was die Arbeit mit den eingehenden Rohdaten äußerst schwierig machen würde. Sobald wir eine lesbare Zeichenfolge haben, können wir eine Antwort erstellen. Aus Gründen der Einfachheit kann die Antwort in unserem Fall alles sein. Am Ende müssen wir unsere Antwort in eine Folge von Bytes zurückkonvertieren, die über die Leitung gesendet werden können. Der Code für die gesamte Logik kann folgendermaßen aussehen:
Wir wissen bereits, dass dies
serverLogic
ein Fluss ist, der a nimmtByteString
und a produzieren mussByteString
. Mit könnendelimiter
wir einByteString
in kleinere Teile teilen - in unserem Fall muss es passieren, wenn ein Zeilenumbruch auftritt.receiver
ist der Fluss, der alle Split-Byte-Sequenzen in eine Zeichenfolge konvertiert. Dies ist natürlich eine gefährliche Konvertierung, da nur druckbare ASCII-Zeichen in eine Zeichenfolge konvertiert werden sollten, aber für unsere Anforderungen ist dies gut genug.responder
ist die letzte Komponente und ist dafür verantwortlich, eine Antwort zu erstellen und die Antwort wieder in eine Folge von Bytes umzuwandeln. Im Gegensatz zur Grafik haben wir diese letzte Komponente nicht in zwei Teile geteilt, da die Logik trivial ist. Am Ende verbinden wir alle Flüsse durch dievia
Funktion. An dieser Stelle kann man sich fragen, ob wir uns um die eingangs erwähnte Mehrbenutzer-Eigenschaft gekümmert haben. Und tatsächlich haben wir es getan, obwohl es möglicherweise nicht sofort offensichtlich ist. Wenn Sie sich diese Grafik ansehen, sollte es klarer werden:Die
serverLogic
Komponente ist nichts anderes als ein Fluss, der kleinere Flüsse enthält. Diese Komponente nimmt eine Eingabe, die eine Anforderung ist, und erzeugt eine Ausgabe, die die Antwort ist. Da Flows mehrfach erstellt werden können und alle unabhängig voneinander arbeiten, erreichen wir durch diese Verschachtelung unsere Mehrbenutzereigenschaft. Jede Anforderung wird innerhalb ihrer eigenen Anforderung behandelt, und daher kann eine Anforderung mit kurzer Laufzeit eine zuvor gestartete Anforderung mit langer Laufzeit überlaufen. Falls Sie sich gefragt haben, kann die DefinitionserverLogic
, die zuvor gezeigt wurde, natürlich viel kürzer geschrieben werden, indem die meisten seiner inneren Definitionen eingefügt werden:Ein Test des Webservers kann folgendermaßen aussehen:
Damit das obige Codebeispiel ordnungsgemäß funktioniert, müssen wir zuerst den Server starten, der im
startServer
Skript dargestellt wird :Das vollständige Codebeispiel für diesen einfachen TCP-Server finden Sie hier . Wir können nicht nur einen Server mit Akka Streams schreiben, sondern auch den Client. Es kann so aussehen:
Den vollständigen Code-TCP-Client finden Sie hier . Der Code sieht ziemlich ähnlich aus, aber im Gegensatz zum Server müssen wir die eingehenden Verbindungen nicht mehr verwalten.
Komplexe Graphen
In den vorherigen Abschnitten haben wir gesehen, wie wir einfache Programme aus Flows erstellen können. In der Realität reicht es jedoch oft nicht aus, sich nur auf bereits integrierte Funktionen zu verlassen, um komplexere Streams zu erstellen. Wenn wir Akka Streams für beliebige Programme verwenden möchten, müssen wir wissen, wie wir unsere eigenen benutzerdefinierten Steuerungsstrukturen und kombinierbaren Abläufe erstellen, um die Komplexität unserer Anwendungen zu bewältigen. Die gute Nachricht ist, dass Akka Streams so konzipiert wurde, dass es mit den Bedürfnissen der Benutzer skaliert. Um Ihnen eine kurze Einführung in die komplexeren Teile von Akka Streams zu geben, fügen wir unserem Client / Server-Beispiel einige weitere Funktionen hinzu.
Eine Sache, die wir noch nicht tun können, ist das Schließen einer Verbindung. Ab diesem Zeitpunkt wird es etwas komplizierter, da die Stream-API, die wir bisher gesehen haben, es uns nicht erlaubt, einen Stream an einem beliebigen Punkt zu stoppen. Es gibt jedoch die
GraphStage
Abstraktion, mit der beliebige Grafikverarbeitungsstufen mit einer beliebigen Anzahl von Eingabe- oder Ausgabeports erstellt werden können. Schauen wir uns zunächst die Serverseite an, auf der wir eine neue Komponente mit dem Namen vorstellencloseConnection
:Diese API sieht viel umständlicher aus als die Flow-API. Kein Wunder, dass wir hier viele wichtige Schritte unternehmen müssen. Im Gegenzug haben wir mehr Kontrolle über das Verhalten unserer Streams. Im obigen Beispiel geben wir nur einen Eingabe- und einen Ausgabeport an und stellen sie dem System durch Überschreiben des
shape
Werts zur Verfügung. Weiterhin haben wir ein sogenanntesInHandler
und ein definiertOutHandler
, die in dieser Reihenfolge für das Empfangen und Senden von Elementen verantwortlich sind. Wenn Sie sich das Beispiel für einen vollständigen Klick-Stream genau angesehen haben, sollten Sie diese Komponenten bereits erkennen. In derInHandler
greifen wir ein Element und wenn es eine Zeichenfolge mit einem einzelnen Zeichen ist'q'
, möchten wir den Stream schließen. Um dem Client die Möglichkeit zu geben, herauszufinden, dass der Stream bald geschlossen wird, geben wir die Zeichenfolge aus"BYE"
und danach schließen wir sofort die Bühne. DiecloseConnection
Komponente kann über dievia
Methode, die im Abschnitt über Flüsse vorgestellt wurde, mit einem Stream kombiniert werden .Neben der Möglichkeit, Verbindungen zu schließen, wäre es auch schön, wenn wir eine Willkommensnachricht zu einer neu erstellten Verbindung anzeigen könnten. Dazu müssen wir noch einmal ein bisschen weiter gehen:
Die Funktion
serverLogic
nimmt nun die eingehende Verbindung als Parameter. In seinem Körper verwenden wir ein DSL, mit dem wir komplexes Stream-Verhalten beschreiben können. Mitwelcome
erstellen wir einen Stream, der nur ein Element ausgeben kann - die Willkommensnachricht.logic
ist das, was wieserverLogic
im vorherigen Abschnitt beschrieben wurde. Der einzige bemerkenswerte Unterschied ist, dass wir hinzugefügtcloseConnection
haben. Jetzt kommt tatsächlich der interessante Teil des DSL. DieGraphDSL.create
Funktion stellt einen Builder zurb
Verfügung, mit dem der Stream als Diagramm ausgedrückt wird. Mit der~>
Funktion ist es möglich, Eingangs- und Ausgangsanschlüsse miteinander zu verbinden. DieConcat
im Beispiel verwendete Komponente kann Elemente verketten und wird hier verwendet, um die Begrüßungsnachricht vor die anderen Elemente zu stellen, aus denen sie hervorgehtinternalLogic
. In der letzten Zeile stellen wir nur den Eingabeport der Serverlogik und den Ausgabeport des verketteten Streams zur Verfügung, da alle anderen Ports ein Implementierungsdetail derserverLogic
Komponente bleiben sollen . Eine ausführliche Einführung in die Grafik DSL von Akka Streams finden Sie im entsprechenden Abschnitt in der offiziellen Dokumentation . Das vollständige Codebeispiel des komplexen TCP-Servers und eines Clients, der mit ihm kommunizieren kann, finden Sie hier . Wenn Sie eine neue Verbindung vom Client aus öffnen, sollte eine Begrüßungsnachricht angezeigt werden. Wenn Sie"q"
auf dem Client eingeben , sollte eine Nachricht angezeigt werden, die Sie darüber informiert, dass die Verbindung abgebrochen wurde.Es gibt noch einige Themen, die in dieser Antwort nicht behandelt wurden. Insbesondere die Materialisierung mag den einen oder anderen Leser erschrecken, aber ich bin mir sicher, dass mit dem hier behandelten Material jeder in der Lage sein sollte, die nächsten Schritte für sich zu tun. Wie bereits erwähnt, ist die offizielle Dokumentation ein guter Ort, um weiter über Akka Streams zu lernen.
quelle