Ich stoße auf ein Problem, bei dem, wenn ich versuche, die Größe des Kernpools eines ThreadPoolExecutor
Pools nach dem Erstellen des Pools auf eine andere Anzahl zu ändern , einige Aufgaben zeitweise mit einem abgelehnt werden RejectedExecutionException
, obwohl ich nie mehr als die queueSize + maxPoolSize
Anzahl der Aufgaben übergebe.
Das Problem, das ich zu lösen versuche, besteht darin, ThreadPoolExecutor
die Größe der Kernthreads basierend auf den ausstehenden Ausführungen in der Warteschlange des Thread-Pools zu erweitern. Ich brauche das, weil a standardmäßig nur dann ThreadPoolExecutor
eine neue erstellt Thread
, wenn die Warteschlange voll ist.
Hier ist ein kleines eigenständiges Pure Java 8-Programm, das das Problem demonstriert.
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolResizeTest {
public static void main(String[] args) throws Exception {
// increase the number of iterations if unable to reproduce
// for me 100 iterations have been enough
int numberOfExecutions = 100;
for (int i = 1; i <= numberOfExecutions; i++) {
executeOnce();
}
}
private static void executeOnce() throws Exception {
int minThreads = 1;
int maxThreads = 5;
int queueCapacity = 10;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
minThreads, maxThreads,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new ThreadPoolExecutor.AbortPolicy()
);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
0, 10, TimeUnit.MILLISECONDS);
CompletableFuture<Void> taskBlocker = new CompletableFuture<>();
try {
int totalTasksToSubmit = queueCapacity + maxThreads;
for (int i = 1; i <= totalTasksToSubmit; i++) {
// following line sometimes throws a RejectedExecutionException
pool.submit(() -> {
// block the thread and prevent it from completing the task
taskBlocker.join();
});
// Thread.sleep(10); //enabling even a small sleep makes the problem go away
}
} finally {
taskBlocker.complete(null);
scheduler.shutdown();
pool.shutdown();
}
}
/**
* Resize the thread pool if the number of pending tasks are non-zero.
*/
private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
int pendingExecutions = pool.getQueue().size();
int approximateRunningExecutions = pool.getActiveCount();
/*
* New core thread count should be the sum of pending and currently executing tasks
* with an upper bound of maxThreads and a lower bound of minThreads.
*/
int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));
pool.setCorePoolSize(newThreadCount);
pool.prestartAllCoreThreads();
}
}
Warum sollte der Pool jemals eine RejectedExecutionException auslösen, wenn ich nie mehr als die queueCapacity + maxThreads sende? Ich ändere niemals die maximale Anzahl von Threads, sodass nach der Definition von ThreadPoolExecutor die Aufgabe entweder in einem Thread oder in der Warteschlange untergebracht werden sollte.
Wenn ich die Größe des Pools nie verändere, lehnt der Thread-Pool natürlich niemals Einsendungen ab. Dies ist auch schwer zu debuggen, da das Problem durch Hinzufügen von Verzögerungen in den Einsendungen behoben wird.
Gibt es Hinweise, wie die RejectedExecutionException behoben werden kann?
quelle
ExecutorService
indem Sie eine vorhandene implementieren , die Aufgaben erneut übermittelt, bei denen die Übermittlung aufgrund der Größenänderung fehlgeschlagen ist?ThreadPoolExecutor
ist sehr wahrscheinlich eine schlechte Idee, und müssten Sie den vorhandenen Code auch in diesem Fall nicht ändern? Am besten geben Sie ein Beispiel dafür, wie Ihr tatsächlicher Code auf den Executor zugreift. Ich wäre überrascht, wenn es viele spezifische Methoden verwenden würdeThreadPoolExecutor
(dh nicht inExecutorService
).Antworten:
Hier ist ein Szenario, warum dies geschieht:
In meinem Beispiel verwende ich minThreads = 0, maxThreads = 2 und queueCapacity = 2, um es kürzer zu machen. Der erste Befehl wird gesendet. Dies erfolgt in der Methode execute:
Für diesen Befehl wird workQueue.offer (Befehl) als addWorker (null, false) ausgeführt. Der Arbeitsthread nimmt diesen Befehl zuerst in der Thread-Ausführungsmethode aus der Warteschlange, sodass die Warteschlange zu diesem Zeitpunkt noch einen Befehl hat.
Der zweite Befehl wird dieses Mal gesendet, wenn workQueue.offer (Befehl) ausgeführt wird. Jetzt ist die Warteschlange voll
Jetzt führt der ScheduledExecutorService die resizeThreadPool-Methode aus, die setCorePoolSize mit maxThreads aufruft. Hier ist die Methode setCorePoolSize:
Diese Methode fügt einen Worker mit addWorker hinzu (null, true). Nein, es werden 2 Worker-Warteschlangen ausgeführt, die maximale und die Warteschlange ist voll.
Der dritte Befehl wird gesendet und schlägt fehl, da workQueue.offer (Befehl) und addWorker (Befehl, false) fehlschlagen, was zur Ausnahme führt:
Ich denke, um dieses Problem zu lösen, sollten Sie die Kapazität der Warteschlange auf das Maximum der Befehle einstellen, die Sie ausführen möchten.
quelle
Ich bin mir nicht sicher, ob dies als Fehler qualifiziert ist. Dies ist das Verhalten, wenn die zusätzlichen Arbeitsthreads erstellt werden, nachdem die Warteschlange voll ist. In Java-Dokumenten wurde jedoch festgestellt, dass der Aufrufer mit abgelehnten Aufgaben umgehen muss.
Java-Dokumente
Wenn Sie die Größe des Kernpools ändern, z. B. erhöhen, werden die zusätzlichen Worker erstellt (
addWorker
Methode insetCorePoolSize
) und der Aufruf zum Erstellen zusätzlicher Work (addWorker
Methode vonexecute
) wird abgelehnt, wenn dieaddWorker
Rückgabe false (add Worker
letzter Codeausschnitt) ist, da bereits genügend zusätzliche Worker vorhanden sind Erstellt von,setCorePoolSize
aber noch nicht ausgeführt, um das Update in der Warteschlange widerzuspiegeln .Relevante Teile
Vergleichen Sie
Verwenden Sie einen benutzerdefinierten Handler für die Ausführung von Wiederholungsversuchen (dies sollte für Ihren Fall funktionieren, da Sie die Obergrenze als maximale Poolgröße festgelegt haben). Bitte nach Bedarf anpassen.
Beachten Sie auch, dass Ihre Verwendung des Herunterfahrens nicht korrekt ist, da dies nicht darauf wartet, dass die übermittelte Aufgabe ausgeführt wird, sondern
awaitTermination
stattdessen mit verwendet wird.quelle