Wir haben eine Situation, in der ich mit einem massiven Zustrom von Ereignissen zu kämpfen habe, die auf unseren Server kommen, durchschnittlich mit etwa 1000 Ereignissen pro Sekunde (Höchststand könnte ~ 2000 sein).
Das Problem
Unser System wird auf Heroku gehostet und verwendet eine relativ teure Heroku Postgres DB , die maximal 500 DB-Verbindungen ermöglicht. Wir verwenden Connection Pooling, um eine Verbindung vom Server zur Datenbank herzustellen.
Ereignisse gehen schneller ein, als der DB-Verbindungspool verarbeiten kann
Das Problem ist, dass Ereignisse schneller eintreten, als der Verbindungspool verarbeiten kann. Sobald eine Verbindung den Netzwerk-Roundtrip vom Server zur Datenbank beendet hat, kann sie wieder in den Pool freigegeben werden, und es treten mehr als n
zusätzliche Ereignisse auf.
Schließlich häufen sich die Ereignisse und warten darauf, gespeichert zu werden. Da keine Verbindungen im Pool verfügbar sind, tritt eine Zeitüberschreitung auf, und das gesamte System wird außer Betrieb gesetzt.
Wir haben den Notfall gelöst, indem wir die störenden Hochfrequenzereignisse langsamer von den Kunden gesendet haben. Wir möchten jedoch weiterhin wissen, wie wir mit diesen Szenarien umgehen können, falls wir diese Hochfrequenzereignisse verarbeiten müssen.
Einschränkungen
Andere Clients möchten möglicherweise Ereignisse gleichzeitig lesen
Andere Clients fordern ständig an, alle Ereignisse mit einem bestimmten Schlüssel zu lesen, auch wenn sie noch nicht in der Datenbank gespeichert sind.
Ein Client kann GET api/v1/events?clientId=1
alle von Client 1 gesendeten Ereignisse abfragen und abrufen, auch wenn diese Ereignisse noch nicht in der Datenbank gespeichert wurden.
Gibt es Beispiele für den Umgang mit "Klassenzimmern"?
Mögliche Lösungen
Stellen Sie die Ereignisse auf unserem Server in eine Warteschlange
Wir könnten die Ereignisse auf dem Server in eine Warteschlange einreihen (wobei die Warteschlange eine maximale Parallelität von 400 aufweist, damit der Verbindungspool nicht ausgeht).
Das ist eine schlechte Idee, weil:
- Der verfügbare Serverspeicher wird aufgebraucht. Die gestapelten Ereignisse in der Warteschlange belegen enorm viel RAM.
- Unsere Server werden alle 24 Stunden neu gestartet . Dies ist eine von Heroku auferlegte harte Grenze . Der Server kann neu gestartet werden, während Ereignisse in die Warteschlange gestellt werden, wodurch die in die Warteschlange gestellten Ereignisse verloren gehen.
- Es wird der Status auf dem Server eingeführt, wodurch die Skalierbarkeit beeinträchtigt wird. Wenn wir ein Multi-Server-Setup haben und ein Client alle in die Warteschlange eingereihten + gespeicherten Ereignisse lesen möchte, wissen wir nicht, auf welchem Server die in die Warteschlange eingereihten Ereignisse aktiv sind.
Verwenden Sie eine separate Nachrichtenwarteschlange
Ich nehme an, wir könnten eine Nachrichtenwarteschlange verwenden (wie RabbitMQ ?), In der wir die Nachrichten pumpen und auf der anderen Seite gibt es einen anderen Server, der nur die Ereignisse in der Datenbank speichert.
Ich bin nicht sicher, ob in Nachrichtenwarteschlangen Ereignisse in der Warteschlange abgefragt werden können (die noch nicht gespeichert wurden). Wenn also ein anderer Client die Nachrichten eines anderen Clients lesen möchte, kann ich nur die gespeicherten Nachrichten aus der Datenbank und die ausstehenden Nachrichten aus der Warteschlange abrufen und verketten Sie sie zusammen, damit ich sie an den Leseanforderungsclient zurücksenden kann.
Verwenden Sie mehrere Datenbanken, von denen jede einen Teil der Nachrichten mit einem zentralen DB-Koordinator-Server speichert, um sie zu verwalten
Eine andere Lösung besteht darin, mehrere Datenbanken mit einem zentralen "DB-Koordinator / Load Balancer" zu verwenden. Bei Erhalt eines Ereignisses würde dieser Koordinator eine der Datenbanken auswählen, in die die Nachricht geschrieben werden soll. Dies sollte es uns ermöglichen, mehrere Heroku-Datenbanken zu verwenden, wodurch das Verbindungslimit auf 500 x Anzahl von Datenbanken erhöht wird.
Bei einer Leseabfrage könnte dieser Koordinator SELECT
Abfragen an jede Datenbank senden, alle Ergebnisse zusammenführen und sie an den Client zurücksenden, der den Lesevorgang angefordert hat.
Das ist eine schlechte Idee, weil:
- Diese Idee klingt wie ... ähm ... Überentwicklung? Wäre auch ein Albtraum (Backups etc ..). Es ist kompliziert zu bauen und zu warten und wenn es nicht unbedingt notwendig ist, klingt es wie eine KISS- Verletzung.
- Es opfert Beständigkeit . Das Ausführen von Transaktionen über mehrere Datenbanken hinweg ist bei dieser Idee kein Problem.
quelle
ANALYZE
die Abfragen selbst ausgeführt und sie sind kein Problem. Ich habe auch einen Prototyp erstellt, um die Verbindungspoolhypothese zu testen und zu überprüfen, ob dies tatsächlich das Problem ist. Die Datenbank und der Server selbst befinden sich auf unterschiedlichen Rechnern, daher die Latenz. Außerdem möchten wir Heroku nicht aufgeben, es sei denn, dies ist absolut notwendig. Die Unbesorgtheit über den Einsatz ist ein großes Plus für uns.select null
mit 500 Verbindungen. Ich wette, Sie werden feststellen, dass der Verbindungspool dort nicht das Problem ist.Antworten:
Eingabestrom
Es ist nicht klar, ob Ihre 1000 Ereignisse / Sekunde Spitzenwerte darstellen oder ob es sich um eine kontinuierliche Last handelt:
Vorgeschlagene Lösung
In beiden Fällen würde ich mich intuitiv für einen Kafka- basierten Event-Stream entscheiden:
Dies ist auf allen Ebenen hoch skalierbar:
Anbieten von Ereignissen, die noch nicht in die DB geschrieben wurden, an Clients
Sie möchten, dass Ihre Kunden auch auf Informationen zugreifen können, die sich noch in der Pipe befinden und noch nicht in die DB geschrieben wurden. Das ist etwas heikler.
Option 1: Verwenden eines Caches zur Ergänzung von Datenbankabfragen
Ich habe nicht eingehend analysiert, aber die erste Idee, die mir einfällt, ist, den / die Abfrageprozessor (en) zu einem (n) Verbraucher (n) der Kafka-Themen zu machen, aber zu einer anderen Kafka-Verbrauchergruppe . Der Anforderungsprozessor würde dann alle Nachrichten empfangen, die der DB-Schreiber empfangen wird, jedoch unabhängig. Sie könnten dann in einem lokalen Cache gespeichert werden. Die Abfragen würden dann auf DB + Cache (+ Beseitigung von Duplikaten) ausgeführt.
Das Design würde dann so aussehen:
Die Skalierbarkeit dieser Abfrageschicht könnte durch Hinzufügen weiterer Abfrageprozessoren (jeder in seiner eigenen Konsumentengruppe) erreicht werden.
Option 2: Entwerfen Sie eine duale API
Ein besserer Ansatz für IMHO wäre, eine duale API anzubieten (nutzen Sie den Mechanismus der separaten Verbrauchergruppe):
Der Vorteil ist, dass Sie den Kunden entscheiden lassen, was interessant ist. Dadurch wird möglicherweise vermieden, dass Sie DB-Daten systematisch mit frisch eingezahlten Daten zusammenführen, wenn der Client nur an neuen eingehenden Ereignissen interessiert ist. Wenn das empfindliche Zusammenführen von neuen und archivierten Ereignissen wirklich erforderlich ist, muss der Kunde dies organisieren.
Varianten
Ich habe kafka vorgeschlagen, da es für sehr hohe Volumes mit beständigen Nachrichten ausgelegt ist, sodass Sie die Server bei Bedarf neu starten können.
Mit RabbitMQ können Sie eine ähnliche Architektur erstellen. Wenn Sie jedoch dauerhafte Warteschlangen benötigen, kann dies die Leistung beeinträchtigen . Soweit ich weiß, ist der einzige Weg, mit RabbitMQ den parallelen Verbrauch derselben Nachrichten durch mehrere Leser (z. B. Writer + Cache) zu erreichen, das Klonen der Warteschlangen . Daher kann eine höhere Skalierbarkeit zu einem höheren Preis führen.
quelle
a distributed database (for example using a specialization of the server by group of keys)
? Auch warum Kafka statt RabbitMQ? Gibt es einen bestimmten Grund, sich für einen anderen zu entscheiden?Use multiple databases
Idee, aber Sie sagen, ich sollte die Nachrichten nicht einfach nach dem Zufallsprinzip (oder im Round-Robin-Verfahren) an jede der Datenbanken verteilen. Richtig?Ich vermute, Sie müssen einen Ansatz, den Sie abgelehnt haben, genauer untersuchen
Mein Vorschlag wäre, die verschiedenen Artikel über die LMAX-Architektur durchzulesen . Sie haben es geschafft, Massen-Batching für ihren Anwendungsfall durchzuführen, und es ist möglich, dass Ihre Kompromisse denen ähneln.
Vielleicht möchten Sie auch sehen, ob Sie die Lesevorgänge aus dem Weg räumen können - idealerweise möchten Sie sie unabhängig von den Schreibvorgängen skalieren können. Dies kann bedeuten, dass Sie sich mit CQRS (Command Query Responsibility Segregation) befassen.
In einem verteilten System können Sie ziemlich sicher sein, dass Nachrichten verloren gehen. Möglicherweise können Sie einige der Auswirkungen abmildern, indem Sie Ihre Sequenzbarrieren berücksichtigen (z. B. sicherstellen, dass das Schreiben in einen dauerhaften Speicher erfolgt, bevor das Ereignis außerhalb des Systems geteilt wird).
Vielleicht - ich würde eher auf Ihre Geschäftsgrenzen schauen, um zu sehen, ob es natürliche Orte gibt, an denen die Daten verloren gehen.
Nun, ich nehme an, dass es welche geben könnte, aber dorthin wollte ich nicht. Der Punkt ist, dass das Design die Robustheit haben sollte, die erforderlich ist, um trotz Nachrichtenverlust Fortschritte zu erzielen.
Dies sieht häufig nach einem Pull-basierten Modell mit Benachrichtigungen aus. Der Anbieter schreibt die Nachrichten in einen bestellten dauerhaften Speicher. Der Verbraucher holt die Nachrichten aus dem Geschäft und verfolgt dabei seine eigene Höchstmarke. Push-Benachrichtigungen werden als latenzreduzierendes Gerät verwendet. Wenn die Benachrichtigung jedoch verloren geht, wird die Nachricht (möglicherweise) immer noch abgerufen, da der Consumer nach einem regelmäßigen Zeitplan abruft (der Unterschied besteht darin, dass der Abruf bei Erhalt der Benachrichtigung früher erfolgt ).
Siehe Zuverlässiges Messaging ohne verteilte Transaktionen von Udi Dahan (bereits von Andy referenziert ) und Polyglot Data von Greg Young.
quelle
In a distributed system, I think you can be pretty confident that messages are going to get lost
. "Ja wirklich?" Gibt es Fälle, in denen der Verlust von Daten ein akzeptabler Kompromiss ist? Ich hatte den Eindruck, dass Datenverlust ein Fehler ist.Wenn ich richtig verstehe, ist der Stromfluss:
In diesem Fall besteht meiner Meinung nach die erste Änderung im Design darin, dass Sie nicht mehr bei jedem Ereignis den Code zurückgeben müssen, mit dem Sie gerade arbeiten. Erstellen Sie stattdessen einen Pool von Einfügungsthreads / -prozessen, der 1: 1 mit der Anzahl der DB-Verbindungen ist. Diese halten jeweils eine dedizierte DB-Verbindung.
Wenn Sie eine Art gleichzeitige Warteschlange verwenden, rufen diese Threads Nachrichten aus der gleichzeitigen Warteschlange ab und fügen sie ein. Theoretisch müssen sie nie die Verbindung zum Pool zurückgeben oder eine neue anfordern, aber Sie müssen möglicherweise die Behandlung einbauen, falls die Verbindung fehlerhaft wird. Es ist möglicherweise am einfachsten, den Thread / Prozess abzubrechen und einen neuen zu starten.
Dadurch sollte der Overhead des Verbindungspools effektiv beseitigt werden. Natürlich müssen Sie in der Lage sein, mindestens 1000 / connections-Ereignisse pro Sekunde auf jede Verbindung zu pushen. Möglicherweise möchten Sie eine andere Anzahl von Verbindungen ausprobieren, da 500 Verbindungen in denselben Tabellen zu Konflikten in der Datenbank führen können, aber das ist eine ganz andere Frage. Eine andere zu berücksichtigende Sache ist die Verwendung von Batch-Inserts, dh jeder Thread zieht eine Anzahl von Nachrichten und schiebt sie auf einmal ein. Vermeiden Sie außerdem, dass mehrere Verbindungen versuchen, dieselben Zeilen zu aktualisieren.
quelle
Annahmen
Ich gehe davon aus, dass die von Ihnen beschriebene Last konstant ist, da dies das schwieriger zu lösende Szenario ist.
Ich gehe auch davon aus, dass Sie eine Möglichkeit haben, ausgelöste Workloads mit langer Laufzeit außerhalb Ihres Webanwendungsprozesses auszuführen.
Lösung
Angenommen, Sie haben Ihren Engpass - die Latenz zwischen Ihrem Prozess und der Postgres-Datenbank - richtig erkannt. Dies ist das Hauptproblem, das Sie lösen müssen. Die Lösung muss Ihre Konsistenzbeschränkung mit anderen Kunden berücksichtigen, die die Ereignisse so schnell wie möglich nach deren Eingang lesen möchten.
Um das Latenzproblem zu lösen, müssen Sie so arbeiten, dass die Latenz pro zu speicherndem Ereignis auf ein Minimum reduziert wird. Dies ist der Schlüssel, den Sie erreichen müssen, wenn Sie nicht bereit oder in der Lage sind, die Hardware zu ändern . Wenn Sie PaaS-Dienste nutzen und keine Kontrolle über Hardware oder Netzwerk haben, können Sie die Latenz pro Ereignis nur durch eine Art Stapelschreiben von Ereignissen reduzieren.
Sie müssen lokal eine Warteschlange mit Ereignissen speichern, die regelmäßig in Ihre Datenbank geschrieben und gelöscht werden, entweder sobald sie eine bestimmte Größe erreicht hat oder nach Ablauf einer bestimmten Zeitspanne. Ein Prozess muss diese Warteschlange überwachen, um das Leeren des Speichers auszulösen. Es sollte eine Vielzahl von Beispielen für die Verwaltung einer gleichzeitigen Warteschlange geben, die regelmäßig in der Sprache Ihrer Wahl geleert wird. Dies ist ein Beispiel in C # aus der regelmäßigen Batching-Senke der beliebten Serilog-Protokollierungsbibliothek.
Diese SO-Antwort beschreibt den schnellsten Weg, Daten in Postgres zu löschen - obwohl es erforderlich wäre, dass Ihr Batching die Warteschlange auf der Festplatte speichert, und es wahrscheinlich ein Problem gibt, das dort behoben werden kann, wenn Ihre Festplatte beim Neustart in Heroku verschwindet.
Zwang
Eine andere Antwort hat bereits CQRS erwähnt , und das ist der richtige Ansatz, um die Einschränkung zu lösen. Sie möchten Lesemodelle hydratisieren, während jedes Ereignis verarbeitet wird. Ein Mediator-Muster kann dabei helfen, ein Ereignis zu kapseln und an mehrere Handler zu verteilen, die gerade im Prozess sind. So kann ein Handler das Ereignis zu Ihrem Lesemodell hinzufügen, das sich im Arbeitsspeicher befindet, den Clients abfragen können, und ein anderer Handler kann für das Einreihen des Ereignisses in die Warteschlange verantwortlich sein, damit es schließlich im Stapel geschrieben werden kann.
Der Hauptvorteil von CQRS besteht darin, dass Sie Ihre konzeptionellen Lese- und Schreibmodelle entkoppeln. Dies ist eine originelle Art zu sagen, dass Sie in ein Modell schreiben und von einem anderen, völlig anderen Modell lesen. Um Skalierbarkeitsvorteile von CQRS zu erhalten, sollten Sie in der Regel sicherstellen, dass jedes Modell separat auf eine Weise gespeichert wird, die für seine Verwendungsmuster optimal ist. In diesem Fall können wir ein aggregiertes Lesemodell verwenden, z. B. einen Redis-Cache oder einfach In-Memory, um sicherzustellen, dass unsere Lesevorgänge schnell und konsistent sind, während wir weiterhin unsere Transaktionsdatenbank zum Schreiben unserer Daten verwenden.
quelle
Dies ist ein Problem, wenn für jeden Prozess eine Datenbankverbindung erforderlich ist. Das System sollte so konzipiert sein, dass Sie über einen Pool von Mitarbeitern verfügen, bei dem jeder Mitarbeiter nur eine Datenbankverbindung benötigt und jeder Mitarbeiter mehrere Ereignisse verarbeiten kann.
Die Nachrichtenwarteschlange kann mit diesem Design verwendet werden. Sie benötigen Nachrichtenproduzenten, die Ereignisse an die Nachrichtenwarteschlange senden, und die Worker (Konsumenten) verarbeiten die Nachrichten aus der Warteschlange.
Diese Einschränkung ist nur möglich, wenn die Ereignisse in der Datenbank ohne Verarbeitung gespeichert sind (Rohereignisse). Wenn Ereignisse verarbeitet werden, bevor sie in der Datenbank gespeichert werden, können sie nur über die Datenbank abgerufen werden.
Wenn die Clients nur unformatierte Ereignisse abfragen möchten, würde ich die Verwendung einer Suchmaschine wie Elastic Search vorschlagen. Sie erhalten sogar die Abfrage- / Such-API kostenlos.
Da es Ihnen wichtig erscheint, Ereignisse abzufragen, bevor sie in der Datenbank gespeichert werden, sollte eine einfache Lösung wie Elastic Search funktionieren. Sie speichern einfach alle Ereignisse darin und duplizieren nicht dieselben Daten, indem Sie sie in die Datenbank kopieren.
Das Skalieren von Elastic Search ist einfach, aber selbst mit der Grundkonfiguration ist es sehr performant.
Wenn Sie eine Verarbeitung benötigen, kann Ihr Prozess die Ereignisse von ES abrufen, verarbeiten und in der Datenbank speichern. Ich weiß nicht, welches Leistungsniveau Sie für diese Verarbeitung benötigen, aber es wäre völlig unabhängig von der Abfrage der Ereignisse von ES. Sie sollten sowieso keine Verbindungsprobleme haben, da Sie eine feste Anzahl von Arbeitern mit jeweils einer Datenbankverbindung haben können.
quelle
1k oder 2k Ereignisse (5KB) pro Sekunde sind für eine Datenbank nicht so wichtig, wenn sie über ein geeignetes Schema und eine entsprechende Speicherengine verfügt. Wie von @eddyce vorgeschlagen, kann ein Master mit einem oder mehreren Slaves die Leseabfragen vom Festschreiben von Schreibvorgängen trennen. Wenn Sie weniger DB-Verbindungen verwenden, erhalten Sie einen besseren Gesamtdurchsatz.
Für diese Anforderungen müssten sie auch aus der Master-Datenbank lesen, da es eine Replikationsverzögerung für die Leseslaves geben würde.
Ich habe (Percona) MySQL mit TokuDB-Engine für sehr umfangreiche Schreibvorgänge verwendet. Es gibt auch eine auf LSMtrees basierende MyRocks-Engine, die gut zum Schreiben geeignet ist. Für diese beiden Engines und wahrscheinlich auch für PostgreSQL gibt es Einstellungen für die Transaktionsisolation sowie für das Synchronisationsverhalten, wodurch die Schreibkapazität erheblich erhöht werden kann. In der Vergangenheit haben wir bis zu 1s verlorene Daten akzeptiert, die dem db-Client als festgeschrieben gemeldet wurden. In anderen Fällen gab es batteriegepufferte SSDs, um Verluste zu vermeiden.
Amazon RDS Aurora in der MySQL-Variante soll einen 6-fach höheren Schreibdurchsatz bei kostengünstiger Replikation aufweisen (vergleichbar mit Slaves, die ein Dateisystem mit dem Master teilen). Die Aurora PostgreSQL-Variante verfügt auch über einen anderen fortschrittlichen Replikationsmechanismus.
quelle
Ich würde Heroku alle zusammen fallen lassen, das heißt, ich würde einen zentralisierten Ansatz fallen lassen: Mehrere Schreibvorgänge, die die maximale Poolverbindung erreichen, sind einer der Hauptgründe, warum DB-Cluster erfunden wurden, hauptsächlich, weil Sie das Schreiben nicht laden db (s) mit leseanforderungen, die von anderen db im cluster ausgeführt werden können, würde ich darüber hinaus mit einer master-slave-topologie versuchen - wie bereits erwähnt, würde es eine eigene db-installation ermöglichen, das ganze zu optimieren System, um sicherzustellen, dass die Weitergabezeit der Abfrage korrekt gehandhabt wird.
Viel Glück
quelle