ES / CQRS-Parallelitätsbehandlung

20

Ich habe kürzlich angefangen, mich mit CQRS / ES zu beschäftigen, weil ich es möglicherweise bei der Arbeit anwenden muss. Es scheint in unserem Fall sehr vielversprechend, da es viele Probleme lösen würde.

Ich skizzierte mein grobes Verständnis, wie eine ES / CQRS-App in einem vereinfachten Banking-Anwendungsfall (Geld abheben) kontextualisiert aussehen sollte.

ES / CQRS

Um es zusammenzufassen, wenn Person A etwas Geld abhebt:

  • ein Befehl wird ausgegeben
  • Befehl wird zur Validierung / Verifizierung übergeben
  • Ein Ereignis wird in einen Ereignisspeicher verschoben, wenn die Überprüfung erfolgreich war
  • Ein Aggregator nimmt das Ereignis aus der Warteschlange, um Änderungen auf das Aggregat anzuwenden

Nach meinem Verständnis ist das Ereignisprotokoll die Quelle der Wahrheit, da es das Protokoll von FACTS ist, aus dem wir dann jede Projektion ableiten können.


Was ich in diesem großen Schema der Dinge nicht verstehe, ist das, was in diesem Fall passiert:

  • Regel: Ein Saldo kann nicht negativ sein
  • Person A hat einen Kontostand von 100e
  • Person A gibt einen WithdrawCommand von 100e aus
  • Validierungsausweise und MoneyWithdrewEvent des Ereignisses 100e werden ausgegeben
  • In der Zwischenzeit gibt Person A einen weiteren WithdrawCommand von 100e aus
  • Das erste MoneyWithdrewEvent wurde noch nicht aggregiert, daher besteht die Validierung, da die Validierungsprüfung für das Aggregat (das noch nicht aktualisiert wurde) durchgeführt wurde.
  • MoneyWithdrewEvent von 100e wird ein anderes Mal ausgegeben

==> Wir befinden uns in einem inkonsistenten Zustand mit einem Kontostand von -100e und das Protokoll enthält 2 MoneyWithdrewEvent

Soweit ich weiß, gibt es verschiedene Strategien, um mit diesem Problem umzugehen:

  • a) Fügen Sie die aggregierte Versions-ID zusammen mit dem Ereignis in den Ereignisspeicher ein. Wenn also beim Ändern eine Versionsinkongruenz auftritt, geschieht nichts
  • b) einige Sperrstrategien anwenden, was impliziert, dass die Verifizierungsschicht irgendwie eine erstellen muss

Fragen zu den Strategien:

  • a) In diesem Fall ist das Ereignisprotokoll nicht mehr die Quelle der Wahrheit, wie soll damit umgegangen werden? Außerdem sind wir zum Kunden zurückgekehrt, obwohl es völlig falsch war, die Auszahlung zuzulassen. Ist es in diesem Fall besser, Sperren zu verwenden?
  • b) Sperren == Deadlocks, haben Sie Einblicke in die Best Practices?

Ist mein Verständnis für den Umgang mit Parallelität insgesamt korrekt?

Hinweis: Ich verstehe, dass dieselbe Person in einem so kurzen Zeitfenster nicht zweimal Geld abheben kann, aber ich habe ein einfaches Beispiel angeführt, um mich nicht in Details zu verlieren

Louis F.
quelle
Aktualisieren Sie das Aggregat in Schritt 4, anstatt auf Schritt 7 zu warten.
Erik Eidt
Sie meinen also, dass der Ereignisspeicher in diesem Fall nur ein Protokoll ist, das erst beim Starten der Anwendung gelesen wird, um Aggregate / andere Projektionen neu zu erstellen?
Louis F.

Antworten:

19

Ich skizzierte mein grobes Verständnis, wie eine ES / CQRS-App in einem vereinfachten Banking-Anwendungsfall (Geld abheben) kontextualisiert aussehen sollte.

Dies ist das perfekte Beispiel für eine Event-Source-Anwendung. Lasst uns beginnen.

Jedes Mal, wenn ein Befehl verarbeitet oder wiederholt wird (Sie werden es verstehen, seien Sie geduldig), werden die folgenden Schritte ausgeführt:

  1. der befehl erreicht einen befehlshandler, dh einen service in der Application layer.
  2. Der Befehlshandler identifiziert den Aggregateund lädt ihn aus dem Repository. In diesem Fall wird der Ladevorgang ausgeführt, indem neweine AggregateInstanz abgerufen, alle zuvor ausgegebenen Ereignisse dieses Aggregats abgerufen und erneut auf das Aggregat selbst angewendet werden. Die Aggregatversion wird für gespeichert spätere Verwendung; nachdem die Ereignisse angewendet wurden, befindet sich das Aggregat in seinem endgültigen Zustand - dh der aktuelle Kontostand wird als Zahl berechnet)
  3. der befehlshandler ruft die entsprechende methode auf dem Aggregatelike auf Account::withdrawMoney(100)und sammelt die ausgegebenen ereignisse, dh MoneyWithdrewEvent(AccountId, 100); Wenn sich nicht genügend Geld auf dem Konto befindet (Kontostand <100), wird eine Ausnahme ausgelöst und alles abgebrochen. Andernfalls wird der nächste Schritt ausgeführt.
  4. Der Befehlshandler versucht, die Aggregateim Repository zu speichern (in diesem Fall ist das Repository das Event Store). Dazu werden die neuen Ereignisse Event streamnur dann an das angefügt, wenn das versionvon Aggregateimmer noch das ist, das beim AggregateLaden des Ereignisses war. Wenn die Version nicht identisch ist, wird der Befehl wiederholt - fahren Sie mit Schritt 1 fort . Ist das versiondasselbe, werden die Ereignisse an das angefügt Event streamund der Client erhält den SuccessStatus.

Diese Versionsprüfung wird als optimistisches Sperren bezeichnet und ist ein allgemeiner Sperrmechanismus. Ein anderer Mechanismus ist das pessimistische Sperren, wenn andere Schriften blockiert werden (wie bei nicht gestartet), bis die aktuelle abgeschlossen ist.

Der Begriff Event streamist eine Abstraktion aller Ereignisse, die von demselben Aggregat ausgegeben wurden.

Sie sollten verstehen, dass dies Event storenur eine andere Art der Persistenz ist, bei der alle Änderungen an einem Aggregat gespeichert werden, nicht nur der Endzustand.

a) In diesem Fall ist das Ereignisprotokoll nicht mehr die Quelle der Wahrheit, wie soll damit umgegangen werden? Außerdem sind wir zum Kunden zurückgekehrt, obwohl es völlig falsch war, die Auszahlung zuzulassen. Ist es in diesem Fall besser, Sperren zu verwenden?

Der Event Store ist immer die Quelle der Wahrheit.

b) Sperren == Deadlocks, haben Sie Einblicke in die Best Practices?

Wenn Sie optimistisches Sperren verwenden, gibt es keine Sperren, Sie müssen nur den Befehl wiederholen.

Wie auch immer, Locks! = Deadlocks

Constantin Galbenu
quelle
2
Es gibt einige Optimierungen in Bezug auf das Laden von AggregateEreignissen, bei denen Sie nicht alle Ereignisse anwenden, sondern eine Momentaufnahme der Ereignisse Aggregatebis zu einem bestimmten Zeitpunkt in der Vergangenheit erstellen und nur die Ereignisse anwenden, die nach diesem Zeitpunkt aufgetreten sind.
Constantin Galbenu
Ok, ich glaube, meine Verwirrung rührt von der Tatsache her, dass Event Store == Event Bus (ich habe Kafka im Sinn), weshalb die Neuerstellung des Aggregats kostspielig sein kann, da Sie möglicherweise eine Menge Ereignisse neu lesen müssen. AggregateWenn Sie einen Schnappschuss von haben , wann sollte der Schnappschuss aktualisiert werden? Entspricht der Snapshot-Speicher dem Ereignisspeicher oder handelt es sich um eine materialisierte Ansicht, die vom Ereignisbus abgeleitet wurde?
Louis F.
Es gibt einige Strategien zum Erstellen des Schnappschusses. Eine ist, alle n Ereignisse einen Schnappschuss zu machen. Sie sollten den Snapshot zusammen mit den Ereignissen an derselben Stelle / Persistenz / Datenbank mit demselben Commit speichern. Die Idee ist, dass der Schnappschuss stark mit der Version des Aggregats verwandt ist.
Constantin Galbenu
Ok, ich glaube, ich habe eine klarere Vorstellung davon, wie ich damit umgehen soll. Nun letzte Frage, welche Rolle spielt der Eventbus am Ende? Werden die Aggregate synchron aktualisiert?
Louis F.
1
Ja, Sie können einen RabbitMQ-Kanal oder einen beliebigen Kanal verwenden, um die Ereignisse asynchron an die gelesenen Modelle zu senden, jedoch erst, nachdem Sie sie im Ereignisspeicher gespeichert haben. In der Tat wird keine Ereignisvalidierung durchgeführt, nachdem sie bestehen geblieben sind: Die Ereignisse stellen Tatsachen dar, die geschehen sind; einem gelesenen Modell mag oder mag nicht, dass etwas passiert ist, aber es kann den Verlauf nicht ändern.
Constantin Galbenu
1

Ich skizzierte mein grobes Verständnis, wie eine ES / CQRS-App in einem vereinfachten Banking-Anwendungsfall (Geld abheben) kontextualisiert aussehen sollte.

Schließen. Das Problem ist, dass sich die Logik zum Aktualisieren Ihres "Aggregats" an einem seltsamen Ort befindet.

Die üblichere Implementierung besteht darin, dass das Datenmodell, das Ihr Befehlshandler im Speicher aufbewahrt, und der Ereignisstrom im Ereignisspeicher synchronisiert bleiben.

Ein leicht zu beschreibendes Beispiel ist der Fall, in dem der Befehlshandler synchrone Schreibvorgänge in den Ereignisspeicher ausführt und seine lokale Kopie des Modells aktualisiert, wenn die Verbindung zum Ereignisspeicher anzeigt, dass der Schreibvorgang erfolgreich war.

Wenn der Befehlshandler erneut mit dem Ereignisspeicher synchronisiert werden muss (da sein internes Modell nicht mit dem des Speichers übereinstimmt), wird der Verlauf aus dem Speicher geladen und der eigene interne Status neu erstellt.

Mit anderen Worten, die Pfeile 2 und 3 (falls vorhanden) wären normalerweise mit dem Ereignisspeicher verbunden, nicht mit einem Aggregatspeicher.

Fügen Sie die aggregierte Versions-ID zusammen mit dem Ereignis in den Ereignisspeicher ein. Wenn also beim Ändern eine Versionsinkongruenz auftritt, geschieht nichts

Variationen davon sind der übliche Fall. Anstatt sie an den Stream im Ereignis-Stream anzuhängen , legen wir sie normalerweise an einer bestimmten Stelle im Stream ab. Wenn dieser Vorgang nicht mit dem Status des Speichers kompatibel ist, schlägt der Schreibvorgang fehl, und der Dienst kann den geeigneten Fehlermodus auswählen (Fehler beim Client, erneuter Versuch, Zusammenführen ....). Die Verwendung von idempotenten Schreibvorgängen löst eine Reihe von Problemen bei verteiltem Messaging, erfordert jedoch natürlich einen Speicher, der ein idempotentes Schreiben unterstützt.

VoiceOfUnreason
quelle
Hmm ich glaube ich habe dann die Event Store Komponente falsch verstanden. Ich dachte, dass alles durchgehen sollte und gestreamt wird. Was ist, wenn mein Event-Store ein Kafka ist und schreibgeschützt ist? Ich kann es mir nicht leisten, in Schritt 2 und 3 alle Nachrichten noch einmal durchzugehen. Es scheint, dass insgesamt meine Vision dieser entsprach: medium.com/technology-learning/…
Louis F.