Eine "universelle Konstruktion" ist eine Wrapper-Klasse für ein sequentielles Objekt, mit der es linearisiert werden kann (eine starke Konsistenzbedingung für gleichzeitige Objekte). Hier ist zum Beispiel eine angepasste wartefreie Konstruktion in Java aus [1], die das Vorhandensein einer wartefreien Warteschlange voraussetzt, die die Schnittstelle erfüllt WFQ
(für die nur ein einmaliger Konsens zwischen Threads erforderlich ist) und eine Sequential
Schnittstelle voraussetzt :
public interface WFQ<T> // "FIFO" iteration
{
int enqueue(T t); // returns the sequence number of t
Iterable<T> iterateUntil(int max); // iterates until sequence max
}
public interface Sequential
{
// Apply an invocation (method + arguments)
// and get a response (return value + state)
Response apply(Invocation i);
}
public interface Factory<T> { T generate(); } // generate new default object
public interface Universal extends Sequential {}
public class SlowUniversal implements Universal
{
Factory<? extends Sequential> generator;
WFQ<Invocation> wfq = new WFQ<Invocation>();
Universal(Factory<? extends Sequential> g) { generator = g; }
public Response apply(Invocation i)
{
int max = wfq.enqueue(i);
Sequential s = generator.generate();
for(Invocation invoc : wfq.iterateUntil(max))
s.apply(invoc);
return s.apply(i);
}
}
Diese Implementierung ist nicht sehr zufriedenstellend, da sie sehr langsam ist (Sie erinnern sich an jeden Aufruf und müssen ihn bei jeder Anwendung erneut abspielen - wir haben eine lineare Laufzeit in der Verlaufsgröße). Gibt es eine Möglichkeit, die Schnittstellen WFQ
und Sequential
(in angemessener Weise) zu erweitern, damit wir beim Anwenden eines neuen Aufrufs einige Schritte speichern können?
Können wir dies effizienter gestalten (nicht die lineare Laufzeit in der Verlaufsgröße, vorzugsweise geht auch die Speichernutzung zurück), ohne die wartefreie Eigenschaft zu verlieren?
Klärung
Eine "universelle Konstruktion" ist ein Begriff, von dem ich mir ziemlich sicher bin, dass er von [1] erfunden wurde, der ein thread-unsicheres, aber thread-kompatibles Objekt akzeptiert, das durch die Sequential
Schnittstelle verallgemeinert wird . Unter Verwendung einer wartefreien Warteschlange bietet die erste Konstruktion eine thread-sichere, linearisierbare Version des Objekts, die auch wartefrei ist (dies setzt Determinismus und Anhalten von apply
Operationen voraus ).
Dies ist ineffizient, da bei dieser Methode effektiv jeder lokale Thread von einem sauberen Slate ausgeht und jede jemals aufgezeichnete Operation darauf anwendet . In jedem Fall funktioniert dies, weil die Synchronisierung effektiv ausgeführt wird, indem mit der WFQ
Taste die Reihenfolge festgelegt wird, in der alle Vorgänge ausgeführt werden sollen: Bei jedem Thread-Aufruf apply
wird dasselbe lokale Sequential
Objekt mit derselben Sequenz von Invocation
s angezeigt.
Meine Frage ist, ob wir (z. B.) einen Hintergrundbereinigungsprozess einführen können, der den "Startzustand" aktualisiert, damit wir nicht neu starten müssen. Dies ist nicht so einfach wie ein atomarer Zeiger mit einem Startzeiger - diese Ansätze verlieren leicht die Garantie der Wartefreiheit. Mein Verdacht ist, dass hier ein anderer warteschlangenbasierter Ansatz funktionieren könnte.
Jargon:
- wartefrei - unabhängig von der Anzahl der Threads oder der Entscheidung des Schedulers,
apply
endet die Ausführung in einer nachweislich begrenzten Anzahl von Befehlen für diesen Thread. - Frei von Sperren - wie oben, lässt jedoch die Möglichkeit einer unbegrenzten Ausführungszeit zu, nur für den Fall, dass eine unbegrenzte Anzahl von
apply
Operationen in anderen Threads ausgeführt wird. Optimistische Synchronisationsschemata fallen normalerweise in diese Kategorie. - Blocking - Effizienz dem Scheduler ausgeliefert.
Ein funktionierendes Beispiel, wie angefordert (jetzt auf einer Seite, die nicht abläuft)
[1] Herlihy und Shavit, die Kunst der Multiprozessor-Programmierung .
CopyableSequential
gültig sind. Die Linearisierbarkeit sollte sich dann aus der Tatsache ergeben, dass dies der Fall istSequential
.Antworten:
Hier finden Sie eine Erklärung und ein Beispiel, wie dies erreicht wird. Lassen Sie mich wissen, wenn Teile nicht klar sind.
Hauptinhalt mit Quelle
Universal
Initialisierung:
Thread-Indizes werden atomar inkrementiert angewendet. Dies wird mit einem
AtomicInteger
benannten verwaltetnextIndex
. Diese Indizes werden Threads über eineThreadLocal
Instanz zugewiesen, die sich selbst initialisiert, indem der nächste IndexnextIndex
abgerufen und inkrementiert wird. Dies geschieht, wenn der Index jedes Threads zum ersten Mal abgerufen wird. AThreadLocal
wird erstellt, um die letzte von diesem Thread erstellte Sequenz zu verfolgen. Es wird mit 0 initialisiert. Die sequenzielle Factory-Objektreferenz wird übergeben und gespeichert. EsAtomicReferenceArray
werden zwei Instanzen mit der Größe erstelltn
. Das Endobjekt wird jeder Referenz zugewiesen, nachdem es mit dem vomSequential
Werk bereitgestellten Anfangszustand initialisiert wurde .n
ist die maximal zulässige Anzahl von Threads. Jedes Element in diesen Arrays gehört zum entsprechenden Thread-Index.Methode anwenden:
Dies ist die Methode, die die interessante Arbeit leistet. Es macht Folgendes:
Dann beginnt die Sequenzierungsschleife. Es wird fortgesetzt, bis der aktuelle Aufruf sequenziert wurde:
decideNext()
Der Schlüssel zu der oben beschriebenen verschachtelten Schleife ist die
decideNext()
Methode. Um das zu verstehen, müssen wir uns die Node-Klasse ansehen.Knotenklasse
Diese Klasse gibt Knoten in einer doppelt verknüpften Liste an. In dieser Klasse ist nicht viel los. Die meisten Methoden sind einfache Abrufmethoden, die sich selbst erklären sollten.
Schwanz Methode
Dies gibt eine spezielle Knoteninstanz mit einer Sequenz von 0 zurück. Sie fungiert einfach als Platzhalter, bis ein Aufruf sie ersetzt.
Eigenschaften und Initialisierung
seq
: die Sequenznummer, initialisiert auf -1 (dh nicht sequenziert)invocation
: der Wert des Aufrufs vonapply()
. Beim Bau einstellen.next
:AtomicReference
für die Weiterleitung. einmal zugewiesen, wird dies niemals geändertprevious
:AtomicReference
für den Rückwärtslink, der bei der Sequenzierung zugewiesen und von gelöscht wurdetruncate()
Entscheide als nächstes
Diese Methode ist nur eine in Node mit nicht-trivialer Logik. Kurz gesagt, ein Knoten wird als Kandidat für den nächsten Knoten in der verknüpften Liste angeboten. Die
compareAndSet()
Methode prüft, ob die Referenz null ist, und setzt in diesem Fall die Referenz auf den Kandidaten. Wenn die Referenz bereits festgelegt ist, wird nichts ausgeführt. Diese Operation ist atomar. Wenn also zwei Kandidaten gleichzeitig angeboten werden, wird nur einer ausgewählt. Dies garantiert, dass immer nur ein Knoten als nächster ausgewählt wird. Wenn der Kandidatenknoten ausgewählt ist, wird seine Sequenz auf den nächsten Wert gesetzt und die vorherige Verknüpfung wird auf diesen Knoten gesetzt.Zurück zur Apply-Methode der Universal-Klasse ...
Nachdem
decideNext()
der letzte sequenzierte Knoten (wenn markiert) mit unserem Knoten oder einem Knoten aus demannounce
Array aufgerufen wurde , gibt es zwei mögliche Vorkommnisse: 1. Der Knoten wurde erfolgreich sequenziert. 2. Ein anderer Thread hat diesen Thread vorab geleert.Der nächste Schritt besteht darin, zu überprüfen, ob der Knoten für diesen Aufruf erstellt wurde. Dies könnte passieren, weil dieser Thread ihn erfolgreich sequenziert hat oder ein anderer Thread ihn aus dem
announce
Array aufgenommen und für uns sequenziert hat. Wenn es nicht sequenziert wurde, wird der Vorgang wiederholt. Andernfalls wird der Aufruf beendet, indem das Announce-Array für den Index dieses Threads gelöscht und der Ergebniswert des Aufrufs zurückgegeben wird. Das Ankündigungsarray wird gelöscht, um sicherzustellen, dass keine Verweise auf den Knoten in der Nähe mehr vorhanden sind, die verhindern würden, dass der Knoten mit Datenmüll gesammelt wird, und daher alle Knoten in der verknüpften Liste ab diesem Zeitpunkt auf dem Heap am Leben erhalten.Methode auswerten
Nachdem der Knoten des Aufrufs erfolgreich sequenziert wurde, muss der Aufruf ausgewertet werden. Um dies zu tun, muss zunächst sichergestellt werden, dass die vorhergehenden Aufrufe ausgewertet wurden. Wenn sie das nicht haben, wird dieser Thread nicht warten, aber das wird sofort funktionieren.
Stellen Sie sicher, Prior-Methode
Die
ensurePrior()
Methode erledigt dies, indem sie den vorherigen Knoten in der verknüpften Liste überprüft. Wenn der Status nicht festgelegt ist, wird der vorherige Knoten ausgewertet. Knoten, dass dies rekursiv ist. Wenn der Knoten vor dem vorherigen Knoten nicht ausgewertet wurde, ruft er die Auswertung für diesen Knoten usw. auf.Nachdem bekannt ist, dass der vorherige Knoten einen Status hat, können wir diesen Knoten auswerten. Der letzte Knoten wird abgerufen und einer lokalen Variablen zugewiesen. Wenn diese Referenz null ist, bedeutet dies, dass ein anderer Thread diesen zuvor geleert und diesen Knoten bereits ausgewertet hat. Festlegen des Status. Andernfalls wird der Status des vorherigen Knotens
Sequential
zusammen mit dem Aufruf dieses Knotens an die Methode apply des Objekts übergeben. Der zurückgegebene Status wird auf dem Knoten festgelegt und dietruncate()
Methode aufgerufen, wodurch die Rückwärtsverbindung vom Knoten gelöscht wird, da sie nicht mehr benötigt wird.MoveForward-Methode
Die Methode "Vorwärts verschieben" versucht, alle Kopfverweise auf diesen Knoten zu verschieben, wenn sie nicht bereits auf etwas Weiteres verweisen. Auf diese Weise wird sichergestellt, dass der Kopf eines Threads, der den Aufruf beendet, keinen Verweis auf einen nicht mehr benötigten Knoten beibehält. Die
compareAndSet()
Methode stellt sicher, dass wir den Knoten nur aktualisieren, wenn ein anderer Thread ihn seit dem Abrufen nicht geändert hat.Array ankündigen und helfen
Der Schlüssel, um diesen Ansatz wartungsfrei und nicht nur sperrenfrei zu machen, ist, dass wir nicht davon ausgehen können, dass der Thread-Scheduler jedem Thread die Priorität gibt, wenn er benötigt wird. Wenn jeder Thread einfach versucht, seine eigenen Knoten zu sequenzieren, ist es möglich, dass ein Thread unter Last kontinuierlich vorentleert wird. Um diese Möglichkeit zu berücksichtigen, versucht jeder Thread zunächst, anderen Threads zu helfen, die möglicherweise nicht sequenziert werden können.
Die Grundidee ist, dass, wenn jeder Thread erfolgreich Knoten erstellt, die zugewiesenen Sequenzen monoton ansteigen. Wenn ein Thread oder mehrere Threads ständig einen anderen Thread vorbelegen, wird der Index, mit dem nicht sequenzierte Knoten im
announce
Array gefunden werden, vorwärts verschoben. Selbst wenn jeder Thread, der gerade versucht, einen bestimmten Knoten zu sequenzieren, ständig von einem anderen Thread vorbelegt wird, versuchen schließlich alle Threads, diesen Knoten zu sequenzieren. Zur Veranschaulichung konstruieren wir ein Beispiel mit drei Threads.Am Startpunkt werden die Head- und Announce-Elemente aller drei Threads auf den
tail
Knoten gerichtet. DielastSequence
für jeden Thread ist 0.Zu diesem Zeitpunkt wird Thread 1 mit einem Aufruf ausgeführt. Es überprüft das Announce-Array auf seine letzte Sequenz (Null), bei der es sich um den Knoten handelt, dessen Indexierung derzeit geplant ist. Der Knoten wird sequenziert und
lastSequence
auf 1 gesetzt.Thread 2 wird jetzt mit einem Aufruf ausgeführt. Er überprüft das Announce-Array bei seiner letzten Sequenz (Null) und stellt fest, dass es keine Hilfe benötigt, und versucht daher, den Aufruf zu sequenzieren. Es ist erfolgreich und jetzt
lastSequence
ist es auf 2 gesetzt.Thread 3 wird nun ausgeführt und sieht auch, dass der Knoten an
announce[0]
bereits sequenziert ist und sequenziert seinen eigenen Aufruf. EslastSequence
ist jetzt auf 3 gesetzt.Jetzt wird Thread 1 erneut aufgerufen. Es überprüft das Announce-Array bei Index 1 und stellt fest, dass es bereits sequenziert ist. Gleichzeitig wird Thread 2 aufgerufen. Es überprüft das Announce-Array bei Index 2 und stellt fest, dass es bereits sequenziert ist. Sowohl Thread 1 als auch Thread 2 versuchen nun, ihre eigenen Knoten zu sequenzieren. Thread 2 gewinnt und sequenziert seinen Aufruf. Es
lastSequence
ist auf 4 gesetzt. In der Zwischenzeit wurde Thread 3 aufgerufen. Es prüft den Index itlastSequence
(Mod 3) und stellt fest, dass der Knoten atannounce[0]
nicht sequenziert wurde. Thread 2 wird erneut aufgerufen, wenn sich Thread 1 beim zweiten Versuch befindet. Faden 1findet einen nicht sequenzierten Aufruf, beiannounce[1]
dem es sich um den gerade von Thread 2 erstellten Knoten handelt . Es versucht, den Aufruf von Thread 2 zu sequenzieren und ist erfolgreich. Thread 2 findet seinen eigenen Knoten anannounce[1]
und wurde sequenziert. Es wirdlastSequence
auf 5 gesetzt. Thread 3 wird dann aufgerufen und findet den Knoten, an dem sich Thread 1announce[0]
befindet, immer noch nicht sequenziert und versucht dies zu tun. In der Zwischenzeit wurde Thread 2 ebenfalls aufgerufen und bereitet Thread 3 vor. Es sortiert seinen Knoten und setzt ihnlastSequence
auf 6.Schlechter Faden 1 . Obwohl Thread 3 versucht, die Reihenfolge zu ändern, wurden beide Threads vom Scheduler kontinuierlich vereitelt. Aber zu diesem Zeitpunkt. Thread 2 zeigt jetzt auch auf
announce[0]
(6 mod 3). Alle drei Threads versuchen, denselben Aufruf zu sequenzieren. Unabhängig davon, welcher Thread erfolgreich ist, ist der nächste zu sequenzierende Knoten der wartende Aufruf von Thread 1, dh der Knoten, auf den von verwiesen wirdannounce[0]
.Das ist unvermeidlich. Damit Threads vorab geleert werden können, müssen andere Threads Sequenzierungsknoten sein, und dabei bewegen sie sich kontinuierlich
lastSequence
vorwärts. Wenn der Knoten eines bestimmten Threads ständig nicht sequenziert wird, zeigen schließlich alle Threads auf seinen Index im Announce-Array. Kein Thread wird etwas anderes tun, bis der Knoten, dem er helfen möchte, sequenziert wurde. Im schlimmsten Fall verweisen alle Threads auf denselben nicht sequenzierten Knoten. Daher ist die Zeit, die erforderlich ist, um einen Aufruf zu sequenzieren, eine Funktion der Anzahl der Threads und nicht der Größe der Eingabe.quelle
previous
und einennext
Zeiger benötigt, um gültig zu sein. Es scheint schwierig, eine gültige Historie wartungsfrei zu pflegen und zu erstellen.Meine vorherige Antwort beantwortet die Frage nicht wirklich richtig, aber da das OP sie als nützlich ansieht, werde ich sie so lassen, wie sie ist. Basierend auf dem Code im Link in der Frage, hier ist mein Versuch. Ich habe hier nur grundlegende Tests durchgeführt, aber es scheint, als ob Durchschnittswerte korrekt berechnet werden. Rückmeldung erwünscht, ob dies ordnungsgemäß wartungsfrei ist.
ANMERKUNG : Ich habe die universelle Schnittstelle entfernt und zu einer Klasse gemacht. Dass Universal nicht nur aus Sequentials besteht, sondern auch einer ist, scheint eine unnötige Komplikation zu sein, aber mir fehlt möglicherweise etwas. In der Durchschnittsklasse habe ich die Zustandsvariable als markiert
volatile
. Dies ist nicht erforderlich, damit der Code funktioniert. Um konservativ zu sein (eine gute Idee beim Threading) und zu verhindern, dass jeder Thread alle Berechnungen ausführt (einmal).Sequenziell & Fabrik
Universal
Durchschnittlich
Demo-Code
Ich habe einige Änderungen am Code vorgenommen, als ich ihn hier veröffentlichte. Es sollte in Ordnung sein, aber lass es mich wissen, wenn du Probleme damit hast.
quelle
wfq
, sodass Sie immer noch die gesamte Historie durchlaufen müssen - die Laufzeit hat sich nur um einen konstanten Faktor verbessert.