Ich möchte eine ThreadPoolExecutor
solche erstellen , dass die submit()
Methode blockiert, wenn versucht wird, neue Aufgaben hinzuzufügen , wenn die maximale Größe erreicht ist und die Warteschlange voll ist . Muss ich dafür eine benutzerdefinierte Implementierung implementieren RejectedExecutionHandler
oder gibt es eine Möglichkeit, dies mithilfe einer Standard-Java-Bibliothek zu tun?
java
concurrency
executor
Fixpunkt
quelle
quelle
Antworten:
Eine der möglichen Lösungen, die ich gerade gefunden habe:
Gibt es noch andere Lösungen? Ich würde etwas bevorzugen, das darauf basiert,
RejectedExecutionHandler
da es eine Standardmethode zu sein scheint, um mit solchen Situationen umzugehen.quelle
throw e;
ist nicht im Buch. JCIP ist richtig!Sie können ThreadPoolExecutor und eine blockierende Warteschlange verwenden:
quelle
Sie sollten das verwenden
CallerRunsPolicy
, das die abgelehnte Aufgabe im aufrufenden Thread ausführt. Auf diese Weise können keine neuen Aufgaben an den Executor gesendet werden, bis diese Aufgabe erledigt ist. Zu diesem Zeitpunkt gibt es einige freie Pool-Threads oder der Vorgang wird wiederholt.http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html
Aus den Dokumenten:
Stellen Sie außerdem sicher, dass Sie beim Aufrufen des
ThreadPoolExecutor
Konstruktors eine begrenzte Warteschlange wie ArrayBlockingQueue verwenden. Andernfalls wird nichts abgelehnt.Bearbeiten: Stellen Sie als Antwort auf Ihren Kommentar die Größe der ArrayBlockingQueue so ein, dass sie der maximalen Größe des Thread-Pools entspricht, und verwenden Sie AbortPolicy.
Edit 2: Ok, ich verstehe, worauf du hinaus willst. Was ist damit: Überschreiben Sie die
beforeExecute()
Methode, um zu überprüfen, ob siegetActiveCount()
nicht überschrittengetMaximumPoolSize()
wird. Wenn dies der Fall ist, schlafen Sie und versuchen Sie es erneut.quelle
Hibernate hat eine
BlockPolicy
, die einfach ist und tun kann, was Sie wollen:Siehe: Executors.java
quelle
ThreadPoolExecutor
sagt sogar wörtlich: "Die Methode getQueue () ermöglicht den Zugriff auf die Arbeitswarteschlange zum Zwecke der Überwachung und des Debuggens. Von der Verwendung dieser Methode für andere Zwecke wird dringend abgeraten." Dass dies in einer so weithin bekannten Bibliothek verfügbar ist, ist absolut traurig zu sehen.Die
BoundedExecutor
oben aus Java Concurrency in Practice zitierte Antwort funktioniert nur dann ordnungsgemäß, wenn Sie eine unbegrenzte Warteschlange für den Executor verwenden oder die gebundene Semaphorgröße nicht größer als die Warteschlangengröße ist. Das Semaphor wird vom übergebenden Thread und den Threads im Pool gemeinsam genutzt, sodass der Executor auch dann gesättigt werden kann, wenn die Warteschlangengröße <gebunden <= (Warteschlangengröße + Poolgröße) ist.Die Verwendung
CallerRunsPolicy
ist nur gültig, wenn Ihre Aufgaben nicht für immer ausgeführt werden. In diesem Fall bleibt Ihr übermittelnder Thread fürrejectedExecution
immer erhalten, und eine schlechte Idee, wenn die Ausführung Ihrer Aufgaben lange dauert, da der übermittelnde Thread keine neuen Aufgaben übermitteln kann oder Tun Sie etwas anderes, wenn eine Aufgabe selbst ausgeführt wird.Wenn dies nicht akzeptabel ist, empfehle ich, die Größe der begrenzten Warteschlange des Executors zu überprüfen, bevor Sie eine Aufgabe senden. Wenn die Warteschlange voll ist, warten Sie eine kurze Zeit, bevor Sie erneut versuchen, sie zu senden. Der Durchsatz wird darunter leiden, aber ich schlage vor, es ist eine einfachere Lösung als viele der anderen vorgeschlagenen Lösungen, und Sie werden garantiert, dass keine Aufgaben abgelehnt werden.
quelle
Ich weiß, es ist ein Hack, aber meiner Meinung nach der sauberste Hack zwischen den hier angebotenen ;-)
Da ThreadPoolExecutor die Blockierungswarteschlange "Angebot" anstelle von "Put" verwendet, können Sie das Verhalten von "Angebot" der Blockierungswarteschlange überschreiben:
Ich habe es getestet und es scheint zu funktionieren. Das Implementieren einer Timeout-Richtlinie bleibt als Übung für den Leser übrig.
quelle
Die folgende Klasse umschließt einen ThreadPoolExecutor und blockiert mithilfe eines Semaphors, wenn die Arbeitswarteschlange voll ist:
Diese Wrapper-Klasse basiert auf einer Lösung aus dem Buch Java Concurrency in Practice von Brian Goetz. Die Lösung in diesem Buch verwendet nur zwei Konstruktorparameter: eine
Executor
und eine für das Semaphor verwendete Grenze. Dies wird in der Antwort von Fixpoint gezeigt. Bei diesem Ansatz gibt es ein Problem: Er kann sich in einem Zustand befinden, in dem die Pool-Threads ausgelastet sind, die Warteschlange voll ist, das Semaphor jedoch gerade eine Genehmigung freigegeben hat. (semaphore.release()
im letzten Block). In diesem Status kann eine neue Aufgabe die gerade freigegebene Berechtigung abrufen, wird jedoch abgelehnt, da die Aufgabenwarteschlange voll ist. Natürlich ist das nicht etwas, was du willst; Sie möchten in diesem Fall blockieren.Um dies zu lösen, müssen wir eine unbegrenzte Warteschlange verwenden, wie JCiP klar erwähnt. Das Semaphor fungiert als Schutz und wirkt wie eine virtuelle Warteschlange. Dies hat den Nebeneffekt, dass das Gerät möglicherweise
maxPoolSize + virtualQueueSize + maxPoolSize
Aufgaben enthalten kann. Warum ist das so? Wegen dersemaphore.release()
im Endblock. Wenn alle Pool-Threads diese Anweisung gleichzeitig aufrufen, werdenmaxPoolSize
Genehmigungen freigegeben, sodass dieselbe Anzahl von Aufgaben in die Einheit gelangen kann. Wenn wir eine begrenzte Warteschlange verwenden würden, wäre diese immer noch voll, was zu einer abgelehnten Aufgabe führen würde. Da wir wissen, dass dies nur auftritt, wenn ein Pool-Thread fast fertig ist, ist dies kein Problem. Wir wissen, dass der Pool-Thread nicht blockiert wird, sodass bald eine Aufgabe aus der Warteschlange entfernt wird.Sie können jedoch eine begrenzte Warteschlange verwenden. Stellen Sie einfach sicher, dass die Größe gleich ist
virtualQueueSize + maxPoolSize
. Größere Größen sind nutzlos. Das Semaphor verhindert, dass mehr Elemente eingelassen werden. Kleinere Größen führen zu abgelehnten Aufgaben. Die Wahrscheinlichkeit, dass Aufgaben abgelehnt werden, steigt mit abnehmender Größe. Angenommen, Sie möchten einen begrenzten Executor mit maxPoolSize = 2 und virtualQueueSize = 5. Nehmen Sie dann ein Semaphor mit 5 + 2 = 7 Genehmigungen und einer tatsächlichen Warteschlangengröße von 5 + 2 = 7. Die tatsächliche Anzahl der Aufgaben, die in der Einheit ausgeführt werden können, beträgt dann 2 + 5 + 2 = 9. Wenn der Executor voll ist (5 Aufgaben in der Warteschlange, 2 im Thread-Pool, also 0 Berechtigungen verfügbar) und ALLE Pool-Threads ihre Berechtigungen freigeben, können genau 2 Genehmigungen von eingehenden Aufgaben übernommen werden.Jetzt ist die Lösung von JCiP etwas umständlich zu verwenden, da sie nicht alle diese Einschränkungen erzwingt (unbegrenzte Warteschlange oder mit diesen mathematischen Einschränkungen verbunden usw.). Ich denke, dass dies nur ein gutes Beispiel ist, um zu demonstrieren, wie Sie neue thread-sichere Klassen basierend auf den bereits verfügbaren Teilen erstellen können, aber nicht als ausgewachsene, wiederverwendbare Klasse. Ich glaube nicht, dass Letzteres die Absicht des Autors war.
quelle
Sie können einen benutzerdefinierten RejectedExecutionHandler wie diesen verwenden
quelle
Erstellen Sie Ihre eigene Blockierungswarteschlange, die vom Executor mit dem gewünschten Blockierungsverhalten verwendet werden soll, und geben Sie dabei immer die verfügbare verbleibende Kapazität zurück (stellen Sie sicher, dass der Executor nicht versucht, mehr Threads als seinen Kernpool zu erstellen oder den Ablehnungshandler auszulösen).
Ich glaube, dies bringt Ihnen das Blockierungsverhalten, das Sie suchen. Ein Ablehnungshandler passt niemals in die Rechnung, da dies anzeigt, dass der Executor die Aufgabe nicht ausführen kann. Was ich mir dort vorstellen kann, ist, dass Sie im Handler eine Art "beschäftigtes Warten" haben. Das ist nicht was Sie wollen, Sie wollen eine Warteschlange für den Executor, die den Anrufer blockiert ...
quelle
ThreadPoolExecutor
Verwendet eineoffer
Methode, um der Warteschlange Aufgaben hinzuzufügen. Wenn ich einen benutzerdefinierten BenutzerBlockingQueue
erstellenoffer
würde , der blockiert , würde dies denBlockingQueue
Vertrag brechen .ThreadPoolExecutor
die Verwendung implementiert wurdeoffer
und nichtput
(die blockierende Version)? Wenn es eine Möglichkeit für den Client-Code gäbe, zu sagen, welche wann verwendet werden soll, wären viele Leute, die versuchen, benutzerdefinierte Lösungen von Hand zu rollen, erleichtert gewesenUm Probleme mit der @ FixPoint-Lösung zu vermeiden. Man könnte ListeningExecutorService verwenden und das Semaphor onSuccess und onFailure in FutureCallback freigeben.
quelle
Runnable
Methoden, da diese Methoden vor der normalen Bereinigung der Worker noch aufgerufen werdenThreadPoolExecutor
. Das heißt, Sie müssen weiterhin Ablehnungsausnahmen behandeln.Kürzlich fand ich diese Frage mit dem gleichen Problem. Das OP sagt dies nicht explizit, aber wir möchten nicht das verwenden,
RejectedExecutionHandler
das eine Aufgabe im Thread des Übermittlers ausführt, da dadurch die Arbeitsthreads nicht ausreichend genutzt werden, wenn diese Aufgabe lange ausgeführt wird.Beim Lesen aller Antworten und Kommentare, insbesondere der fehlerhaften Lösung mit dem Semaphor oder beim Verwenden, habe
afterExecute
ich mir den Code des ThreadPoolExecutor genauer angesehen, um festzustellen , ob es einen Ausweg gibt. Ich war erstaunt zu sehen, dass es mehr als 2000 Zeilen (kommentierten) Codes gibt, von denen mir einige schwindelig werden . Angesichts der ziemlich einfachen Anforderung, die ich tatsächlich habe - ein Hersteller, mehrere Verbraucher, lassen Sie den Hersteller blockieren, wenn kein Verbraucher arbeiten kann -, habe ich beschlossen, meine eigene Lösung zu entwickeln. Es ist kein,ExecutorService
sondern nur einExecutor
. Und es passt die Anzahl der Threads nicht an die Arbeitslast an, sondern enthält nur eine feste Anzahl von Threads, was auch meinen Anforderungen entspricht. Hier ist der Code. Fühlen Sie sich frei, darüber zu schimpfen :-)quelle
Ich glaube, es gibt eine ziemlich elegante Möglichkeit, dieses Problem zu lösen, indem das
java.util.concurrent.Semaphore
Verhalten von verwendet und delegiert wirdExecutor.newFixedThreadPool
. Der neue Executor-Dienst führt eine neue Aufgabe nur aus, wenn ein Thread dafür vorhanden ist. Das Blockieren wird von Semaphore mit einer Anzahl von Genehmigungen verwaltet, die der Anzahl von Threads entspricht. Wenn eine Aufgabe abgeschlossen ist, wird eine Genehmigung zurückgegeben.quelle
Ich hatte in der Vergangenheit das gleiche Bedürfnis: eine Art Blockierungswarteschlange mit einer festen Größe für jeden Client, der von einem gemeinsam genutzten Thread-Pool unterstützt wird. Am Ende habe ich meine eigene Art von ThreadPoolExecutor geschrieben:
UserThreadPoolExecutor (Blockierungswarteschlange (pro Client) + Threadpool (von allen Clients gemeinsam genutzt))
Siehe: https://github.com/d4rxh4wx/UserThreadPoolExecutor
Jeder UserThreadPoolExecutor erhält eine maximale Anzahl von Threads von einem gemeinsam genutzten ThreadPoolExecutor
Jeder UserThreadPoolExecutor kann:
quelle
Ich habe diese Ablehnungsrichtlinie im elastischen Suchclient gefunden. Es blockiert den Anrufer-Thread in der Blockierungswarteschlange. Code unten-
quelle
Ich hatte vor kurzem das Bedürfnis, etwas Ähnliches zu erreichen, aber auf einem
ScheduledExecutorService
.Ich musste auch sicherstellen, dass ich die Verzögerung handele, die an die Methode übergeben wird, und sicherstellen, dass entweder die Aufgabe zur Ausführung gesendet wird, wie es der Aufrufer erwartet, oder einfach fehlschlägt, wodurch a ausgelöst wird
RejectedExecutionException
.Andere Methoden
ScheduledThreadPoolExecutor
zum Ausführen oder Senden einer Aufgabe rufen intern auf,#schedule
wodurch wiederum die überschriebenen Methoden aufgerufen werden.Ich habe den Code hier, freue mich über jedes Feedback. https://github.com/AmitabhAwasthi/BlockingScheduler
quelle
Hier ist die Lösung, die wirklich gut zu funktionieren scheint. Es heißt NotifyingBlockingThreadPoolExecutor .
Demo-Programm.
Bearbeiten: Es gibt ein Problem mit diesem Code, die await () -Methode ist fehlerhaft. Das Aufrufen von shutdown () + awaitTermination () scheint gut zu funktionieren.
quelle
Ich mag CallerRunsPolicy nicht immer, zumal es der abgelehnten Aufgabe ermöglicht, die Warteschlange zu überspringen und vor Aufgaben ausgeführt zu werden, die zuvor gesendet wurden. Darüber hinaus kann das Ausführen der Aufgabe im aufrufenden Thread viel länger dauern, als darauf zu warten, dass der erste Steckplatz verfügbar wird.
Ich habe dieses Problem mit einem benutzerdefinierten RejectedExecutionHandler gelöst, der den aufrufenden Thread einfach für eine Weile blockiert und dann versucht, die Aufgabe erneut zu senden:
Diese Klasse kann im Thread-Pool-Executor einfach wie jede andere als RejectedExecutinHandler verwendet werden, zum Beispiel:
Der einzige Nachteil, den ich sehe, ist, dass der aufrufende Thread möglicherweise etwas länger als unbedingt erforderlich gesperrt wird (bis zu 250 ms). Da dieser Executor effektiv rekursiv aufgerufen wird, kann eine sehr lange Wartezeit auf die Verfügbarkeit eines Threads (Stunden) zu einem Stapelüberlauf führen.
Trotzdem gefällt mir diese Methode persönlich. Es ist kompakt, leicht zu verstehen und funktioniert gut.
quelle