Ich bin seit einiger Zeit frustriert über das Standardverhalten, ThreadPoolExecutor
das die ExecutorService
Thread-Pools unterstützt, die so viele von uns verwenden. Um aus den Javadocs zu zitieren:
Wenn mehr als corePoolSize, aber weniger als maximalPoolSize-Threads ausgeführt werden, wird ein neuer Thread nur erstellt , wenn die Warteschlange voll ist .
Dies bedeutet, dass beim Definieren eines Thread-Pools mit dem folgenden Code der zweite Thread niemals gestartet wird, da der Thread LinkedBlockingQueue
unbegrenzt ist.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Nur wenn Sie eine begrenzte Warteschlange haben und die Warteschlange voll ist, werden Threads über der Kernnummer gestartet. Ich vermute, dass eine große Anzahl von Junior-Java-Multithread-Programmierern dieses Verhalten der nicht kennt ThreadPoolExecutor
.
Jetzt habe ich einen speziellen Anwendungsfall, bei dem dies nicht optimal ist. Ich suche nach Möglichkeiten, ohne meine eigene TPE-Klasse zu schreiben, um das Problem zu umgehen.
Meine Anforderungen gelten für einen Webdienst, der Rückrufe an einen möglicherweise unzuverlässigen Dritten durchführt.
- Ich möchte den Rückruf nicht synchron mit der Webanforderung durchführen, daher möchte ich einen Thread-Pool verwenden.
- Normalerweise bekomme ich ein paar davon pro Minute, daher möchte ich keine
newFixedThreadPool(...)
mit einer großen Anzahl von Threads haben, die meistens inaktiv sind. - Von Zeit zu Zeit bekomme ich einen Ausbruch dieses Datenverkehrs und möchte die Anzahl der Threads auf einen Maximalwert erhöhen (sagen wir 50).
- Ich muss den besten Versuch unternehmen, alle Rückrufe durchzuführen, damit ich weitere Rückrufe über 50 in die Warteschlange stellen kann. Ich möchte den Rest meines Webservers nicht mit a überfordern
newCachedThreadPool()
.
Wie kann ich diese Einschränkung umgehen, ThreadPoolExecutor
wenn die Warteschlange begrenzt und voll sein muss, bevor weitere Threads gestartet werden? Wie kann ich dafür sorgen, dass mehr Threads gestartet werden, bevor Aufgaben in die Warteschlange gestellt werden?
Bearbeiten:
@Flavio macht einen guten Punkt bei der Verwendung von ThreadPoolExecutor.allowCoreThreadTimeOut(true)
, um das Timeout und das Beenden der Kernthreads zu erreichen . Ich habe darüber nachgedacht, aber ich wollte immer noch die Core-Threads-Funktion. Ich wollte nicht, dass die Anzahl der Threads im Pool nach Möglichkeit unter die Kerngröße fällt.
Antworten:
Ich glaube, ich habe endlich eine etwas elegante (vielleicht etwas hackige) Lösung für diese Einschränkung gefunden
ThreadPoolExecutor
. Es geht um erstreckt ,LinkedBlockingQueue
es zu haben Rückkehrfalse
zu ,queue.offer(...)
wenn es bereits einige Tasks in der Liste. Wenn die aktuellen Threads nicht mit den Aufgaben in der Warteschlange Schritt halten, fügt das TPE zusätzliche Threads hinzu. Befindet sich der Pool bereits bei maximaler Anzahl von Threads,RejectedExecutionHandler
wird der aufgerufen. Es ist der Handler, der dasput(...)
in die Warteschlange stellt.Es ist sicherlich seltsam, eine Warteschlange zu schreiben,
offer(...)
die zurückkehren kannfalse
undput()
niemals blockiert, also ist das der Hack-Teil. Dies funktioniert jedoch gut mit der Verwendung der Warteschlange durch TPE, sodass ich kein Problem damit sehe.Hier ist der Code:
Wenn ich mit diesem Mechanismus Aufgaben an die Warteschlange sende, wird der
ThreadPoolExecutor
Wille:offer(...)
wird false zurückgegeben.RejectedExecutionHandler
RejectedExecutionHandler
dann legt die Aufgabe in die Warteschlange durch den ersten verfügbaren Thread in FIFO - Reihenfolge verarbeitet werden.Obwohl in meinem obigen Beispielcode die Warteschlange unbegrenzt ist, können Sie sie auch als begrenzte Warteschlange definieren. Wenn Sie beispielsweise eine Kapazität von 1000 hinzufügen
LinkedBlockingQueue
, wird Folgendes ausgeführt:Auch, wenn Sie verwenden benötigt
offer(...)
in derRejectedExecutionHandler
dann könnte man die Verwendungoffer(E, long, TimeUnit)
Methode stattdessen mitLong.MAX_VALUE
als Timeout.Warnung:
Wenn Sie erwarten, dass dem Executor nach dem Herunterfahren Aufgaben hinzugefügt werden , sollten Sie klüger sein
RejectedExecutionException
,RejectedExecutionHandler
wenn Sie den Executor-Service heruntergefahren haben. Vielen Dank an @RaduToader für diesen Hinweis.Bearbeiten:
Eine weitere Optimierung dieser Antwort könnte darin bestehen, das TPE zu fragen, ob Leerlauf-Threads vorhanden sind, und das Element nur dann in die Warteschlange zu stellen, wenn dies der Fall ist. Sie müssten dafür eine echte Klasse erstellen und eine
ourQueue.setThreadPoolExecutor(tpe);
Methode hinzufügen .Dann
offer(...)
könnte Ihre Methode ungefähr so aussehen:tpe.getPoolSize() == tpe.getMaximumPoolSize()
in diesem Fall nur anrufensuper.offer(...)
.tpe.getPoolSize() > tpe.getActiveCount()
rufen Sie an,super.offer(...)
da es scheinbar inaktive Threads gibt.false
zu einem anderen Thread zurück.Vielleicht das:
Beachten Sie, dass die get-Methoden für TPE teuer sind, da sie auf
volatile
Felder zugreifen oder (im Fall vongetActiveCount()
) das TPE sperren und die Thread-Liste durchlaufen. Außerdem gibt es hier Rennbedingungen, die dazu führen können, dass eine Aufgabe nicht ordnungsgemäß in die Warteschlange gestellt oder ein anderer Thread gegabelt wird, wenn ein Leerlauf-Thread vorhanden ist.quelle
Queue
um dies zu erreichen, nicht gefällt , sind Sie mit Ihrer Idee sicherlich nicht allein: groovy-programming.com/post/26923146865execute(runnable)
,runnable
wird er nur zur Warteschlange hinzugefügt. Wenn Sie anrufenexecute(secondRunnable)
,secondRunnable
wird dies zur Warteschlange hinzugefügt. Aber jetzt, wenn Sie anrufenexecute(thirdRunnable)
,thirdRunnable
wird in einem neuen Thread ausgeführt. Dierunnable
undsecondRunnable
nur einmalige AusführungthirdRunnable
(oder die ursprüngliche Langzeitaufgabe) sind beendet.Stellen Sie die Kerngröße und die maximale Größe auf denselben Wert ein und ermöglichen Sie das Entfernen von Kernthreads aus dem Pool mit
allowCoreThreadTimeOut(true)
.quelle
Ich habe bereits zwei weitere Antworten auf diese Frage, aber ich vermute, dass diese die beste ist.
Es basiert auf der Technik der aktuell akzeptierten Antwort , nämlich:
offer()
Methode der Warteschlange, um (manchmal) false zurückzugeben.ThreadPoolExecutor
entweder einen neuen Thread erzeugt oder die Aufgabe ablehnt, undRejectedExecutionHandler
auf tatsächlich auf Ablehnung der Aufgabe Warteschlange.Das Problem ist, wann
offer()
false zurückgegeben werden sollte. Die aktuell akzeptierte Antwort gibt false zurück, wenn die Warteschlange einige Aufgaben enthält. Wie ich in meinem Kommentar dort ausgeführt habe, führt dies jedoch zu unerwünschten Effekten. Wenn Sie alternativ immer false zurückgeben, werden auch dann immer wieder neue Threads erzeugt, wenn Threads in der Warteschlange warten.Die Lösung besteht darin, Java 7 zu verwenden
LinkedTransferQueue
und einenoffer()
Anruf zu tätigentryTransfer()
. Wenn ein Consumer-Thread wartet, wird die Aufgabe nur an diesen Thread übergeben. Andernfallsoffer()
wird false zurückgegeben undThreadPoolExecutor
ein neuer Thread erzeugt.quelle
queue.offer()
, weil es tatsächlich aufruftLinkedTransferQueue.tryTransfer()
, wird false zurückgeben und die Aufgabe nicht in die Warteschlange stellen. Allerdings dieRejectedExecutionHandler
Aufrufequeue.put()
, die nicht fehlschlagen und die Aufgabe in die Warteschlange stellen.Hinweis: Ich bevorzuge und empfehle jetzt meine andere Antwort .
Hier ist eine Version, die sich für mich viel einfacher anfühlt: Erhöhen Sie die CorePoolSize (bis zur Grenze von MaximumPoolSize), wenn eine neue Aufgabe ausgeführt wird, und verringern Sie dann die CorePoolSize (bis zur Grenze der vom Benutzer angegebenen "Core-Pool-Größe"), wann immer a Aufgabe abgeschlossen.
Um es anders auszudrücken, verfolgen Sie die Anzahl der ausgeführten oder in die Warteschlange gestellten Aufgaben und stellen Sie sicher, dass corePoolSize der Anzahl der Aufgaben entspricht, solange sie zwischen der vom Benutzer angegebenen "Kernpoolgröße" und der maximalenPoolSize liegt.
Wie geschrieben, unterstützt die Klasse das Ändern der benutzerdefinierten CorePoolSize oder MaximumPoolSize nach der Erstellung nicht und das Manipulieren der Arbeitswarteschlange nicht direkt oder über
remove()
oderpurge()
.quelle
synchronized
Blöcke. Können Sie die Warteschlange anrufen, um die Anzahl der Aufgaben zu ermitteln? Oder vielleicht eine verwendenAtomicInteger
?execute()
Anrufen in separaten Threads gibt, wird jeder (1) herausfinden, wie viele Threads benötigt werden, (2)setCorePoolSize
diese Nummer und (3) anrufensuper.execute()
. Wenn die Schritte (1) und (2) nicht synchronisiert sind, bin ich mir nicht sicher, wie ich eine unglückliche Reihenfolge verhindern kann, bei der Sie die Größe des Kernpools nach einer höheren Zahl auf eine niedrigere Zahl setzen. Mit direktem Zugriff auf das Feld der Oberklasse könnte dies stattdessen mit compare-and-set erfolgen, aber ich sehe keinen sauberen Weg, dies in einer Unterklasse ohne Synchronisation zu tun.taskCount
Feld gültig ist (dh aAtomicInteger
). Wenn zwei Threads die Poolgröße unmittelbar nacheinander neu berechnen, sollten die richtigen Werte erhalten werden. Wenn der zweite die Kern-Threads verkleinert, muss er einen Abfall in der Warteschlange oder so gesehen haben.execute()
. Jeder wird anrufenatomicTaskCount.incrementAndGet()
und 10 bzw. 11 erhalten. Aber ohne Synchronisation (über das Abrufen der Aufgabenanzahl und das Festlegen der Kernpoolgröße) könnten Sie dann erhalten, dass (1) Aufgabe 11 die Kernpoolgröße auf 11 setzt, (2) Aufgabe 10 die Kernpoolgröße auf 10 setzt, (3) Task 10 ruft aufsuper.execute()
, (4) Task 11 ruft aufsuper.execute()
und wird in die Warteschlange gestellt.Wir haben eine Unterklasse
ThreadPoolExecutor
, die eine zusätzliche nimmtcreationThreshold
und überschreibtexecute
.Vielleicht hilft das auch, aber deine sieht natürlich künstlerischer aus ...
quelle
offer(...)
Methode nurfalse
bedingt zurückgegeben wird. Vielen Dank!Die empfohlene Antwort behebt nur eines (1) des Problems mit dem JDK-Thread-Pool:
JDK-Thread-Pools sind auf Warteschlangen ausgerichtet. Anstatt einen neuen Thread zu erzeugen, werden sie die Aufgabe in die Warteschlange stellen. Nur wenn die Warteschlange ihr Limit erreicht, erzeugt der Thread-Pool einen neuen Thread.
Das Zurückziehen des Threads erfolgt nicht, wenn die Last leichter wird. Wenn beispielsweise eine Reihe von Jobs auf den Pool trifft, die dazu führen, dass der Pool maximal wird, gefolgt von einer geringen Last von maximal 2 Aufgaben gleichzeitig, verwendet der Pool alle Threads, um die geringe Last zu bedienen, wodurch ein Zurückziehen des Threads verhindert wird. (nur 2 Threads würden benötigt ...)
Unzufrieden mit dem oben genannten Verhalten habe ich einen Pool implementiert, um die oben genannten Mängel zu beheben.
So beheben Sie das Problem: 2) Die Verwendung der Lifo-Zeitplanung behebt das Problem. Diese Idee wurde von Ben Maurer auf der ACM-Konferenz 2015 vorgestellt: Systems @ Facebook scale
So wurde eine neue Implementierung geboren:
LifoThreadPoolExecutorSQP
Bisher verbessert diese Implementierung die Leistung der asynchronen Ausführung für ZEL .
Die Implementierung ist spinfähig, um den Overhead für Kontextwechsel zu reduzieren und für bestimmte Anwendungsfälle eine überlegene Leistung zu erzielen.
Ich hoffe es hilft...
PS: JDK Fork Join Pool implementiert ExecutorService und arbeitet als "normaler" Thread-Pool. Die Implementierung ist performant. Sie verwendet die LIFO-Thread-Planung. Es gibt jedoch keine Kontrolle über die Größe der internen Warteschlange, das Zeitlimit für den Ruhestand ... und vor allem können Aufgaben nicht ausgeführt werden beim Abbrechen unterbrochen
quelle
Hinweis: Ich bevorzuge und empfehle jetzt meine andere Antwort .
Ich habe einen anderen Vorschlag, der der ursprünglichen Idee folgt, die Warteschlange so zu ändern, dass sie false zurückgibt. In diesem Fall können alle Aufgaben in die Warteschlange aufgenommen werden. Wenn jedoch eine Aufgabe danach in die Warteschlange gestellt wird,
execute()
folgt eine Sentinel-No-Op-Aufgabe, die von der Warteschlange abgelehnt wird, wodurch ein neuer Thread erzeugt wird, der das No-Op unmittelbar danach ausführt etwas aus der Warteschlange.Da Worker-Threads möglicherweise
LinkedBlockingQueue
nach einer neuen Aufgabe abfragen, kann eine Aufgabe auch dann in die Warteschlange gestellt werden, wenn ein Thread verfügbar ist. Um zu vermeiden, dass neue Threads erzeugt werden, selbst wenn Threads verfügbar sind, müssen wir verfolgen, wie viele Threads auf neue Aufgaben in der Warteschlange warten, und einen neuen Thread nur dann erzeugen, wenn sich mehr Aufgaben in der Warteschlange befinden als wartende Threads.quelle
Die beste Lösung, die ich mir vorstellen kann, ist zu erweitern.
ThreadPoolExecutor
bietet einige Hakenmethoden an:beforeExecute
undafterExecute
. In Ihrer Erweiterung können Sie eine begrenzte Warteschlange zum Einspeisen von Aufgaben und eine zweite unbegrenzte Warteschlange zum Behandeln des Überlaufs verwenden. Wenn jemand anruftsubmit
, können Sie versuchen, die Anforderung in die begrenzte Warteschlange zu stellen. Wenn Sie auf eine Ausnahme stoßen, stecken Sie die Aufgabe einfach in Ihre Überlaufwarteschlange. Sie können dann denafterExecute
Hook verwenden, um festzustellen, ob sich nach Abschluss einer Aufgabe etwas in der Überlaufwarteschlange befindet. Auf diese Weise kümmert sich der Executor zuerst um das Material in seiner begrenzten Warteschlange und zieht automatisch aus dieser unbegrenzten Warteschlange, wenn es die Zeit erlaubt.Es scheint mehr Arbeit als Ihre Lösung zu sein, aber es geht zumindest nicht darum, Warteschlangen unerwartetes Verhalten zu geben. Ich stelle mir auch vor, dass es eine bessere Möglichkeit gibt, den Status der Warteschlange und der Threads zu überprüfen, als sich auf Ausnahmen zu verlassen, die relativ langsam ausgelöst werden.
quelle
Hinweis: Wenn Sie für JDK ThreadPoolExecutor eine begrenzte Warteschlange haben, erstellen Sie nur dann neue Threads, wenn das Angebot false zurückgibt . Mit CallerRunsPolicy erhalten Sie möglicherweise etwas Nützliches , das ein wenig BackPressure erzeugt und Aufrufe direkt im Aufrufer-Thread ausführt.
Ich brauche Aufgaben aus Fäden am Pool und haben eine ubounded Warteschlange für die Planung erstellt ausgeführt werden, während die Anzahl der Threads im Pool können wachsen oder schrumpfen zwischen corePoolSize und maximumPoolSize so ...
Am Ende habe ich ein vollständiges Kopieren und Einfügen aus ThreadPoolExecutor durchgeführt und die Ausführungsmethode ein wenig geändert , da dies leider nicht durch Erweiterung möglich war (es werden private Methoden aufgerufen ).
Ich wollte nicht sofort neue Threads erzeugen, wenn eine neue Anfrage eintrifft und alle Threads beschäftigt sind (weil ich im Allgemeinen kurzlebige Aufgaben habe). Ich habe einen Schwellenwert hinzugefügt, kann ihn aber jederzeit an Ihre Bedürfnisse anpassen (möglicherweise ist es für IO meistens besser, diesen Schwellenwert zu entfernen).
quelle