Ich bin dabei, ein System zu entwerfen, das eine Verbindung zu einem oder mehreren Datenfeeds herstellt und eine Analyse der Daten durchführt, um Ereignisse basierend auf dem Ergebnis auszulösen. In einem typischen Multithread-Producer / Consumer-Setup werden mehrere Producer-Threads Daten in eine Warteschlange stellen und mehrere Consumer-Threads die Daten lesen. Die Konsumenten interessieren sich nur für den neuesten Datenpunkt plus n Anzahl von Punkten. Die Producer-Threads müssen blockieren, wenn der langsame Consumer nicht mithalten kann, und natürlich werden die Consumer-Threads blockiert, wenn keine unverarbeiteten Updates vorhanden sind. Die Verwendung einer typischen gleichzeitigen Warteschlange mit Lese- / Schreibsperre funktioniert gut, aber die eingehende Datenrate kann sehr hoch sein. Daher wollte ich meinen Sperraufwand reduzieren, insbesondere die Schreibsperren für die Produzenten. Ich denke, ein kreisförmiger, sperrenfreier Puffer ist das, was ich brauchte.
Nun zwei Fragen:
Ist ein kreisförmiger sperrenfreier Puffer die Antwort?
Wenn ja, bevor ich meine eigenen rolle, kennen Sie eine öffentliche Implementierung, die meinen Anforderungen entspricht?
Hinweise zur Implementierung eines zirkularen sperrenfreien Puffers sind immer willkommen.
Übrigens, dies in C ++ unter Linux.
Einige zusätzliche Informationen:
Die Reaktionszeit ist für mein System entscheidend. Im Idealfall möchten die Consumer-Threads, dass Aktualisierungen so schnell wie möglich eingehen, da eine zusätzliche Verzögerung von 1 Millisekunde das System wertlos machen oder viel weniger wert sein kann.
Die Designidee, zu der ich mich neige, ist ein halbverriegelungsfreier Ringpuffer, in dem der Producer-Thread Daten so schnell wie möglich in den Puffer legt. Rufen wir den Kopf des Puffers A auf, ohne ihn zu blockieren, es sei denn, der Puffer ist voll, wenn A trifft auf das Ende des Puffers Z. Consumer-Threads enthalten jeweils zwei Zeiger auf den Kreispuffer P und P n , wobei P der lokale Pufferkopf des Threads ist und P n das n-te Element nach P ist. Jeder Consumer-Thread rückt sein P vor und P n, sobald es die Verarbeitung des Stroms P beendet hat und das Ende des Pufferzeigers Z mit dem langsamsten P n vorgerückt ist. Wenn P A erreicht, was bedeutet, dass kein neues Update mehr verarbeitet werden muss, dreht sich der Verbraucher und wartet damit, dass A wieder vorrückt. Wenn sich der Consumer-Thread zu lange dreht, kann er in den Ruhezustand versetzt werden und auf eine Bedingungsvariable warten, aber ich bin damit einverstanden, dass der Consumer den CPU-Zyklus in Anspruch nimmt und auf ein Update wartet, da dies meine Latenz nicht erhöht (ich werde mehr CPU-Kerne haben als Fäden). Stellen Sie sich vor, Sie haben eine kreisförmige Spur und der Produzent läuft vor einer Gruppe von Verbrauchern. Der Schlüssel besteht darin, das System so abzustimmen, dass der Produzent normalerweise nur ein paar Schritte vor den Verbrauchern läuft, und die meisten dieser Vorgänge können es sein erfolgt mit sperrfreien Techniken. Ich verstehe, dass es nicht einfach ist, die Details der Implementierung richtig zu machen ... okay, sehr schwer, deshalb möchte ich aus den Fehlern anderer lernen, bevor ich einige meiner eigenen mache.
quelle
Antworten:
Ich habe in den letzten Jahren eine spezielle Studie über sperrenfreie Datenstrukturen durchgeführt. Ich habe die meisten Zeitungen auf dem Gebiet gelesen (es gibt nur ungefähr vierzig oder so - obwohl nur ungefähr zehn oder fünfzehn wirklich nützlich sind :-)
AFAIK, ein sperrenfreier Kreispuffer, wurde nicht erfunden. Das Problem wird darin bestehen, sich mit dem komplexen Zustand zu befassen, in dem ein Leser einen Schriftsteller überholt oder umgekehrt.
Wenn Sie nicht mindestens sechs Monate damit verbracht haben, sperrenfreie Datenstrukturen zu studieren, versuchen Sie nicht, selbst eine zu schreiben. Sie werden es falsch verstehen und es ist Ihnen möglicherweise nicht klar, dass Fehler vorliegen, bis Ihr Code nach der Bereitstellung auf neuen Plattformen fehlschlägt.
Ich glaube jedoch, dass es eine Lösung für Ihre Anforderung gibt.
Sie sollten eine Warteschlange ohne Sperre mit einer Liste ohne Sperren koppeln.
Die kostenlose Liste gibt Ihnen eine Vorabzuweisung und vermeidet so die (steuerlich teure) Anforderung eines sperrfreien Zuweisers. Wenn die freie Liste leer ist, replizieren Sie das Verhalten eines Umlaufpuffers, indem Sie ein Element sofort aus der Warteschlange entfernen und es stattdessen verwenden.
(Natürlich ist das Erhalten eines Elements in einem auf Sperren basierenden Kreispuffer nach Erreichen der Sperre sehr schnell - im Grunde genommen nur eine Zeiger-Dereferenzierung -, aber Sie werden dies in keinem sperrenfreien Algorithmus erhalten; sie müssen häufig gehen Der Aufwand für das Fehlschlagen eines Popups mit freier Liste, gefolgt von einer Warteschlange, entspricht dem Arbeitsaufwand, den ein sperrfreier Algorithmus leisten muss.
Michael und Scott haben 1996 eine wirklich gute Warteschlange ohne Sperren entwickelt. Über einen der folgenden Links erhalten Sie genügend Details, um das PDF ihres Papiers aufzuspüren. Michael und Scott, FIFO
Eine sperrenfreie freie Liste ist der einfachste sperrfreie Algorithmus, und tatsächlich glaube ich nicht, dass ich ein aktuelles Papier dafür gesehen habe.
quelle
volatile
Speicherzugriffe werden nicht mit anderenvolatile
Zugriffen neu angeordnet. Aber in ISO C ist das alles, was Sie bekommen. In MSVCvolatile
geht es weit darüber hinaus, aber heutzutage sollten Sie nurstd::atomic
mitmemory_order_release
oderseq_cst
oder was auch immer Sie wollen verwenden.Der Kunstbegriff für das, was Sie wollen, ist eine Warteschlange ohne Sperren . Es gibt eine exzellente Reihe von Notizen mit Links zu Code und Papieren von Ross Bencina. Der Typ, dessen Arbeit ich am meisten vertraue, ist Maurice Herlihy (für Amerikaner spricht er seinen Vornamen wie "Morris" aus).
quelle
Die Anforderung, dass Produzenten oder Konsumenten blockieren, wenn der Puffer leer oder voll ist, legt nahe, dass Sie eine normale Sperrdatenstruktur mit Semaphoren oder Bedingungsvariablen verwenden sollten, damit Produzenten und Konsumenten blockieren, bis Daten verfügbar sind. Sperrfreier Code blockiert unter solchen Bedingungen im Allgemeinen nicht - er dreht oder bricht Vorgänge ab, die nicht ausgeführt werden können, anstatt mit dem Betriebssystem zu blockieren. (Wenn Sie es sich leisten können, zu warten, bis ein anderer Thread Daten erzeugt oder verbraucht, warum dann auf eine Sperre warten, bis ein anderer Thread die Aktualisierung der Datenstruktur abgeschlossen hat?)
Unter (x86 / x64) Linux ist die Intra-Thread-Synchronisation mit Mutexen relativ kostengünstig, wenn keine Konflikte bestehen. Konzentrieren Sie sich darauf, die Zeit zu minimieren, die Hersteller und Verbraucher benötigen, um ihre Schlösser festzuhalten. Angesichts der Tatsache, dass Sie gesagt haben, dass Sie sich nur um die letzten N aufgezeichneten Datenpunkte kümmern, denke ich, dass ein kreisförmiger Puffer dies ziemlich gut tun würde. Ich verstehe jedoch nicht wirklich, wie dies zu der Blockierungsanforderung und der Idee passt, dass Verbraucher die von ihnen gelesenen Daten tatsächlich verbrauchen (entfernen). (Möchten Sie, dass die Verbraucher nur die letzten N Datenpunkte betrachten und nicht entfernen? Möchten Sie, dass es den Herstellern egal ist, ob die Verbraucher nicht mithalten können, und nur alte Daten überschreiben?)
Wie Zan Lynx kommentierte, können Sie Ihre Daten auch zu größeren Blöcken zusammenfassen / puffern, wenn viele davon eingehen. Sie können eine feste Anzahl von Punkten oder alle innerhalb einer bestimmten Zeit empfangenen Daten puffern . Dies bedeutet, dass weniger Synchronisationsvorgänge ausgeführt werden. Es führt zwar zu einer Latenz, aber wenn Sie kein Echtzeit-Linux verwenden, müssen Sie sich ohnehin bis zu einem gewissen Grad damit befassen.
quelle
Die Implementierung in der Boost-Bibliothek ist eine Überlegung wert. Es ist einfach zu bedienen und ziemlich leistungsstark. Ich habe einen Test geschrieben und ihn auf einem Quad-Core-i7-Laptop (8 Threads) ausgeführt und pro Sekunde ~ 4 Millionen Enqueue / Dequeue-Vorgänge ausgeführt. Eine weitere Implementierung, die bisher nicht erwähnt wurde, ist die MPMC-Warteschlange unter http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue . Ich habe einige einfache Tests mit dieser Implementierung auf demselben Laptop mit 32 Herstellern und 32 Verbrauchern durchgeführt. Es ist, wie angekündigt, schneller als die Boost-Lockless-Warteschlange.
Wie die meisten anderen Antworten ist die programmlose Programmierung schwierig. Bei den meisten Implementierungen ist es schwierig, Eckfälle zu erkennen, deren Behebung viel Testen und Debuggen erfordert. Diese werden normalerweise durch sorgfältiges Platzieren von Speicherbarrieren im Code behoben. In vielen wissenschaftlichen Artikeln finden Sie auch Korrektheitsnachweise. Ich bevorzuge es, diese Implementierungen mit einem Brute-Force-Tool zu testen. Jeder sperrlose Algorithmus, den Sie in der Produktion verwenden möchten, sollte mit einem Tool wie http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html auf Richtigkeit überprüft werden .
quelle
Es gibt eine ziemlich gute Artikelserie über DDJ . Als Zeichen dafür, wie schwierig dieses Zeug sein kann, ist es eine Korrektur eines früheren Artikels , die es falsch verstanden hat. Stellen Sie sicher, dass Sie die Fehler verstehen, bevor Sie Ihre eigenen rollen) -;
quelle
Ich bin kein Experte für Hardware-Speichermodelle und sperrenfreie Datenstrukturen, und ich vermeide es, diese in meinen Projekten zu verwenden, und ich gehe mit traditionellen gesperrten Datenstrukturen um.
Allerdings habe ich kürzlich das Video bemerkt: Lockless SPSC-Warteschlange basierend auf Ringpuffer
Dies basiert auf einer Open-Source-Hochleistungs-Java-Bibliothek namens LMAX Distruptor, die von einem Handelssystem verwendet wird: LMAX Distruptor
Basierend auf der obigen Darstellung machen Sie Kopf- und Schwanzzeiger atomar und prüfen atomar, ob der Kopf den Schwanz von hinten fängt oder umgekehrt.
Unten sehen Sie eine sehr grundlegende C ++ 11-Implementierung dafür:
// USING SEQUENTIAL MEMORY #include<thread> #include<atomic> #include <cinttypes> using namespace std; #define RING_BUFFER_SIZE 1024 // power of 2 for efficient % class lockless_ring_buffer_spsc { public : lockless_ring_buffer_spsc() { write.store(0); read.store(0); } bool try_push(int64_t val) { const auto current_tail = write.load(); const auto next_tail = increment(current_tail); if (next_tail != read.load()) { buffer[current_tail] = val; write.store(next_tail); return true; } return false; } void push(int64_t val) { while( ! try_push(val) ); // TODO: exponential backoff / sleep } bool try_pop(int64_t* pval) { auto currentHead = read.load(); if (currentHead == write.load()) { return false; } *pval = buffer[currentHead]; read.store(increment(currentHead)); return true; } int64_t pop() { int64_t ret; while( ! try_pop(&ret) ); // TODO: exponential backoff / sleep return ret; } private : std::atomic<int64_t> write; std::atomic<int64_t> read; static const int64_t size = RING_BUFFER_SIZE; int64_t buffer[RING_BUFFER_SIZE]; int64_t increment(int n) { return (n + 1) % size; } }; int main (int argc, char** argv) { lockless_ring_buffer_spsc queue; std::thread write_thread( [&] () { for(int i = 0; i<1000000; i++) { queue.push(i); } } // End of lambda expression ); std::thread read_thread( [&] () { for(int i = 0; i<1000000; i++) { queue.pop(); } } // End of lambda expression ); write_thread.join(); read_thread.join(); return 0; }
quelle
size
, so dass das%
(Modulo) nur ein bisschen bitweise UND ist. Das Speichern einer Sequenznummer in Ihren Slots würde außerdem die Konkurrenz zwischen Hersteller und Verbraucher verringern. Dabei muss der Produzent diewrite
Position lesen und umgekehrt, also die Cache-Zeile, die diese atomaren Variablen Ping-Pongs zwischen den Kernen enthält. Unter stackoverflow.com/questions/45907210/… finden Sie eine Slot-Sequenznummer. (Es ist eine Multi-Producer-Multi-Consumer-Warteschlange und könnte stark zu einer einzigen Producer / Consumer-Warteschlange wie dieser vereinfacht werden.)memory_order_acquire
oderrelease
nichtseq_cst
. Dies ist ein großer Unterschied zu x86, woseq_cst
Geschäfte benötigtmfence
(oderxchg
),release
Geschäfte jedoch nur einfache x86-Geschäfte sind. StoreLoad-Barrieren sind auch bei den meisten anderen Architekturen die teuerste Barriere. ( preshing.com/20120710/… )read
danachbuffer
in das Klassenlayout zu setzen , also ist es in einer anderen Cache-Zeile alswrite
. Die beiden Threads lesen also nur Cache-Zeilen, die von den anderen geschrieben wurden, und nicht beide, die in dieselbe Cache-Zeile schreiben. Außerdem sollten sie seinsize_t
: Es macht keinen Sinn, 64-Bit-Zähler mit 32-Bit-Zeigern zu haben. Und ein nicht signierter Typ macht Modulo viel effizienter ( godbolt.org/g/HMVL5C ). Auchuint32_t
wäre für fast alle Anwendungen sinnvoll. Es ist wahrscheinlich am besten, dies nach Größe zu gestalten oder den Puffer dynamisch zuzuweisen.n
Bits einfach mit einemAND
. zBx % 8
=x & 7
, und bitweises UND ist viel billiger alsdiv
oder sogar Tricks, die Sie mit Teilern mit konstanter Kompilierungszeit ausführen können.Eine nützliche Technik, um Konflikte zu reduzieren, besteht darin, die Elemente in mehrere Warteschlangen zu stellen und jeden Verbraucher einem "Thema" zu widmen.
Für die letzte Anzahl von Artikeln, an denen Ihre Kunden interessiert sind - Sie möchten nicht die gesamte Warteschlange sperren und darüber iterieren, um einen Artikel zu finden, der überschrieben werden soll - veröffentlichen Sie einfach Artikel in N-Tupeln, dh alle N letzten Artikel. Bonuspunkte für die Implementierung, bei der der Produzent mit einem Timeout die gesamte Warteschlange blockiert (wenn die Verbraucher nicht mithalten können) und den lokalen Tupel-Cache aktualisiert. Auf diese Weise wird die Datenquelle nicht unter Druck gesetzt.
quelle
Ich würde diesem Artikel zustimmen und davon abraten, sperrenfreie Datenstrukturen zu verwenden. Ein relativ neues Papier auf schleusenfreien FIFOWarteschlangen ist dies , vom selben Autor (en) für weitere Papiere suchen; Es gibt auch eine Doktorarbeit über Chalmers über sperrenfreie Datenstrukturen (ich habe den Link verloren). Sie haben jedoch nicht angegeben, wie groß Ihre Elemente sind. Sperrenfreie Datenstrukturen funktionieren nur mit Elementen in Wortgröße effizient. Daher müssen Sie Ihre Elemente dynamisch zuordnen, wenn sie größer als ein Maschinenwort sind (32 oder 64) Bits). Wenn Sie Elemente dynamisch zuweisen, verschieben Sie den Engpass (vermutlich, da Sie Ihr Programm nicht profiliert haben und im Grunde genommen eine vorzeitige Optimierung durchführen) auf den Speicherzuweiser, sodass Sie einen sperrfreien Speicherzuweiser benötigen, z. B. Streamflowund integrieren Sie es in Ihre Anwendung.
quelle
Sutters Warteschlange ist nicht optimal und er weiß es. Die Art of Multicore-Programmierung ist eine großartige Referenz, aber vertrauen Sie den Java-Leuten bei Speichermodellen nicht. Ross 'Links geben Ihnen keine eindeutige Antwort, da sie ihre Bibliotheken in solchen Problemen hatten und so weiter.
Das sperrenfreie Programmieren ist problematisch, es sei denn, Sie möchten viel Zeit mit etwas verbringen, das Sie eindeutig überarbeitet haben, bevor Sie das Problem lösen (nach der Beschreibung zu urteilen, ist es ein üblicher Wahnsinn, nach Perfektion zu suchen 'in Cache-Kohärenz). Es dauert Jahre und führt dazu, dass die Probleme nicht zuerst gelöst und später eine häufige Krankheit optimiert werden.
quelle
Obwohl dies eine alte Frage ist, erwähnte niemand den sperrenlosen Ringpuffer von DPDK. Es ist ein Ringpuffer mit hohem Durchsatz, der mehrere Hersteller und mehrere Verbraucher unterstützt. Es bietet auch Einzelverbraucher- und Einzelproduzenten-Modi, und der Ringpuffer ist im SPSC-Modus wartungsfrei. Es ist in C geschrieben und unterstützt mehrere Architekturen.
Darüber hinaus werden die Modi "Bulk" und "Burst" unterstützt, in denen Elemente in großen Mengen in die Warteschlange gestellt / aus der Warteschlange entfernt werden können. Mit dem Design können mehrere Konsumenten oder mehrere Produzenten gleichzeitig in die Warteschlange schreiben, indem der Speicherplatz einfach durch Bewegen eines Atomzeigers reserviert wird.
quelle
Der Vollständigkeit halber gibt es in OtlContainers einen gut getesteten sperrenfreien Ringpuffer , der jedoch in Delphi geschrieben ist (TOmniBaseBoundedQueue ist Kreispuffer und TOmniBaseBoundedStack ist ein begrenzter Stapel). Es gibt auch eine unbegrenzte Warteschlange in derselben Einheit (TOmniBaseQueue). Die unbegrenzte Warteschlange wird in der Warteschlange ohne dynamische Sperren beschrieben - richtig machen . Die anfängliche Implementierung der begrenzten Warteschlange (Ringpuffer) wurde schließlich in Eine Warteschlange ohne Sperren beschrieben ! aber der Code wurde seitdem aktualisiert.
quelle
Dies ist ein alter Thread, aber da er noch nicht erwähnt wurde, gibt es im JUCE C ++ - Framework ein sperrenfreies, zirkuläres FIFO mit 1 Hersteller und> 1 Verbraucher.
https://www.juce.com/doc/classAbstractFifo#details
quelle
Schauen Sie sich Disruptor ( Verwendung ) an, einen Ringpuffer, den mehrere Threads abonnieren können:
quelle
So würde ich es machen:
Das Einfügen besteht aus der Verwendung eines CAS mit einem Inkrement und einem Rollover beim nächsten Schreibvorgang. Wenn Sie einen Steckplatz haben, fügen Sie Ihren Wert hinzu und setzen Sie das entsprechende leere / volle Bit.
Das Entfernen erfordert eine Überprüfung des Bits vor dem Testen auf Unterläufe, ist jedoch anders als beim Schreiben, verwendet jedoch den Leseindex und löscht das leere / volle Bit.
Sei gewarnt,
quelle
Sie können lfqueue versuchen
Es ist einfach zu bedienen, es ist kreisförmig frei von Schlössern
int *ret; lfqueue_t results; lfqueue_init(&results); /** Wrap This scope in multithread testing **/ int_data = (int*) malloc(sizeof(int)); assert(int_data != NULL); *int_data = i++; /*Enqueue*/ while (lfqueue_enq(&results, int_data) != 1) ; /*Dequeue*/ while ( (ret = lfqueue_deq(&results)) == NULL); // printf("%d\n", *(int*) ret ); free(ret); /** End **/ lfqueue_clear(&results);
quelle
Es gibt Situationen, in denen Sie keine Sperre benötigen, um den Rennzustand zu verhindern, insbesondere wenn Sie nur einen Hersteller und Verbraucher haben.
Betrachten Sie diesen Absatz aus LDD3:
quelle
Vor einiger Zeit habe ich eine gute Lösung für dieses Problem gefunden. Ich glaube, dass es das kleinste ist, das bisher gefunden wurde.
Das Repository enthält ein Beispiel dafür, wie Sie damit N Threads (Leser und Schreiber) erstellen und dann einen einzelnen Sitzplatz gemeinsam nutzen können.
Ich habe am Testbeispiel einige Benchmarks erstellt und die folgenden Ergebnisse erhalten (in Millionen Ops / Sek.):
Nach Puffergröße
Nach Anzahl der Threads
Beachten Sie, dass die Anzahl der Threads den Durchsatz nicht ändert.
Ich denke, dies ist die ultimative Lösung für dieses Problem. Es funktioniert und ist unglaublich schnell und einfach. Selbst mit Hunderten von Threads und einer Warteschlange an einer einzigen Position. Es kann als Pipeline zwischen Threads verwendet werden, um Speicherplatz in der Warteschlange zuzuweisen.
Das Repository enthält einige frühe Versionen, die in C # und Pascal geschrieben sind. Ich arbeite daran, etwas vollständiger zu polieren, um seine wahren Kräfte zu zeigen.
Ich hoffe, einige von Ihnen können die Arbeit validieren oder mit einigen Ideen helfen. Oder kannst du es zumindest brechen?
quelle
Wenn Sie davon ausgehen, dass der Puffer niemals voll wird, sollten Sie diesen sperrfreien Algorithmus verwenden:
Hier ist eine Java-Implementierung:
class ConcurrentRingBuffer<T> { private static final VarHandle BUFFER = MethodHandles.arrayElementVarHandle(Object[].class); private final int mask; // These three should be on different cache lines private final Object[] buffer; private final AtomicInteger readIndex = new AtomicInteger(); private final AtomicInteger writeIndex = new AtomicInteger(); ConcurrentRingBuffer(int capacity) { assert isPowerOfTwo(capacity); mask = capacity - 1; buffer = new Object[capacity]; } void put(T element) { BUFFER.setRelease(buffer, writeIndex.getAndIncrement() & mask, element); } T take() { Object element; int readIndex = this.readIndex.getAndIncrement() & mask; while ((element = BUFFER.getAcquire(buffer, readIndex)) == null) { Thread.onSpinWait(); } BUFFER.setOpaque(buffer, readIndex, null); return (T) element; } }
quelle