Java ThreadPoolExecutor: Durch das Aktualisieren der Größe des Kernpools werden eingehende Aufgaben zeitweise dynamisch zurückgewiesen

13

Ich stoße auf ein Problem, bei dem, wenn ich versuche, die Größe des Kernpools eines ThreadPoolExecutorPools nach dem Erstellen des Pools auf eine andere Anzahl zu ändern , einige Aufgaben zeitweise mit einem abgelehnt werden RejectedExecutionException, obwohl ich nie mehr als die queueSize + maxPoolSizeAnzahl der Aufgaben übergebe.

Das Problem, das ich zu lösen versuche, besteht darin, ThreadPoolExecutordie Größe der Kernthreads basierend auf den ausstehenden Ausführungen in der Warteschlange des Thread-Pools zu erweitern. Ich brauche das, weil a standardmäßig nur dann ThreadPoolExecutoreine neue erstellt Thread, wenn die Warteschlange voll ist.

Hier ist ein kleines eigenständiges Pure Java 8-Programm, das das Problem demonstriert.

import static java.lang.Math.max;
import static java.lang.Math.min;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolResizeTest {

    public static void main(String[] args) throws Exception {
        // increase the number of iterations if unable to reproduce
        // for me 100 iterations have been enough
        int numberOfExecutions = 100;

        for (int i = 1; i <= numberOfExecutions; i++) {
            executeOnce();
        }
    }

    private static void executeOnce() throws Exception {
        int minThreads = 1;
        int maxThreads = 5;
        int queueCapacity = 10;

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                minThreads, maxThreads,
                0, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(queueCapacity),
                new ThreadPoolExecutor.AbortPolicy()
        );

        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
                0, 10, TimeUnit.MILLISECONDS);
        CompletableFuture<Void> taskBlocker = new CompletableFuture<>();

        try {
            int totalTasksToSubmit = queueCapacity + maxThreads;

            for (int i = 1; i <= totalTasksToSubmit; i++) {
                // following line sometimes throws a RejectedExecutionException
                pool.submit(() -> {
                    // block the thread and prevent it from completing the task
                    taskBlocker.join();
                });
                // Thread.sleep(10); //enabling even a small sleep makes the problem go away
            }
        } finally {
            taskBlocker.complete(null);
            scheduler.shutdown();
            pool.shutdown();
        }
    }

    /**
     * Resize the thread pool if the number of pending tasks are non-zero.
     */
    private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
        int pendingExecutions = pool.getQueue().size();
        int approximateRunningExecutions = pool.getActiveCount();

        /*
         * New core thread count should be the sum of pending and currently executing tasks
         * with an upper bound of maxThreads and a lower bound of minThreads.
         */
        int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));

        pool.setCorePoolSize(newThreadCount);
        pool.prestartAllCoreThreads();
    }
}

Warum sollte der Pool jemals eine RejectedExecutionException auslösen, wenn ich nie mehr als die queueCapacity + maxThreads sende? Ich ändere niemals die maximale Anzahl von Threads, sodass nach der Definition von ThreadPoolExecutor die Aufgabe entweder in einem Thread oder in der Warteschlange untergebracht werden sollte.

Wenn ich die Größe des Pools nie verändere, lehnt der Thread-Pool natürlich niemals Einsendungen ab. Dies ist auch schwer zu debuggen, da das Problem durch Hinzufügen von Verzögerungen in den Einsendungen behoben wird.

Gibt es Hinweise, wie die RejectedExecutionException behoben werden kann?

Swaranga Sarma
quelle
Warum stellen Sie nicht Ihre eigene Implementierung bereit, ExecutorServiceindem Sie eine vorhandene implementieren , die Aufgaben erneut übermittelt, bei denen die Übermittlung aufgrund der Größenänderung fehlgeschlagen ist?
Daniu
@daniu das ist eine Problemumgehung. Der Punkt der Fragen ist, warum der Pool jemals eine RejectedExecutionException auslösen sollte, wenn ich nie mehr als die queueCapacity + maxThreads sende. Ich ändere niemals die maximale Anzahl von Threads, sodass nach der Definition von ThreadPoolExecutor die Aufgabe entweder in einem Thread oder in der Warteschlange untergebracht werden sollte.
Swaranga Sarma
Ok, ich habe deine Frage anscheinend falsch verstanden. Was ist es? Möchten Sie wissen, warum das Verhalten stattfindet oder wie Sie es umgehen und Probleme für Sie verursachen?
Daniu
Ja, das Ändern meiner Implementierung in einen Executor-Service ist nicht möglich, da sich ein Großteil des Codes auf den ThreadPoolExecutor bezieht. Wenn ich also immer noch einen veränderbaren ThreadPoolExecutor haben möchte, muss ich wissen, wie ich ihn beheben kann. Der richtige Weg, dies zu tun, besteht darin, ThreadPoolExecutor zu erweitern und Zugriff auf einige seiner geschützten Variablen zu erhalten und die Poolgröße innerhalb eines synchronisierten Blocks für eine von der Superklasse gemeinsam genutzte Sperre zu aktualisieren.
Swaranga Sarma
Eine Erweiterung ThreadPoolExecutorist sehr wahrscheinlich eine schlechte Idee, und müssten Sie den vorhandenen Code auch in diesem Fall nicht ändern? Am besten geben Sie ein Beispiel dafür, wie Ihr tatsächlicher Code auf den Executor zugreift. Ich wäre überrascht, wenn es viele spezifische Methoden verwenden würde ThreadPoolExecutor(dh nicht in ExecutorService).
Daniu

Antworten:

5

Hier ist ein Szenario, warum dies geschieht:

In meinem Beispiel verwende ich minThreads = 0, maxThreads = 2 und queueCapacity = 2, um es kürzer zu machen. Der erste Befehl wird gesendet. Dies erfolgt in der Methode execute:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

Für diesen Befehl wird workQueue.offer (Befehl) als addWorker (null, false) ausgeführt. Der Arbeitsthread nimmt diesen Befehl zuerst in der Thread-Ausführungsmethode aus der Warteschlange, sodass die Warteschlange zu diesem Zeitpunkt noch einen Befehl hat.

Der zweite Befehl wird dieses Mal gesendet, wenn workQueue.offer (Befehl) ausgeführt wird. Jetzt ist die Warteschlange voll

Jetzt führt der ScheduledExecutorService die resizeThreadPool-Methode aus, die setCorePoolSize mit maxThreads aufruft. Hier ist die Methode setCorePoolSize:

 public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

Diese Methode fügt einen Worker mit addWorker hinzu (null, true). Nein, es werden 2 Worker-Warteschlangen ausgeführt, die maximale und die Warteschlange ist voll.

Der dritte Befehl wird gesendet und schlägt fehl, da workQueue.offer (Befehl) und addWorker (Befehl, false) fehlschlagen, was zur Ausnahme führt:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at ThreadPoolResizeTest.executeOnce(ThreadPoolResizeTest.java:60)
at ThreadPoolResizeTest.runTest(ThreadPoolResizeTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)

Ich denke, um dieses Problem zu lösen, sollten Sie die Kapazität der Warteschlange auf das Maximum der Befehle einstellen, die Sie ausführen möchten.

Thomas Krieger
quelle
Richtig. Ich konnte repro, indem ich den Code in meine eigene Klasse kopierte und Logger hinzufügte. Wenn die Warteschlange voll ist und ich eine neue Aufgabe einreiche, wird grundsätzlich versucht, einen neuen Worker zu erstellen. Wenn mein Resizer in diesem Moment auch setCorePoolSize auf 2 aufruft, wird auch ein neuer Worker erstellt. Zu diesem Zeitpunkt konkurrieren zwei Worker um das Hinzufügen, aber beide können es nicht sein, da dies gegen die Beschränkung der maximalen Poolgröße verstoßen würde, sodass die neue Aufgabenübermittlung abgelehnt wird. Ich denke, dies ist eine Rennbedingung und ich habe einen Fehlerbericht bei OpenJDK eingereicht. Wir werden sehen. Aber du hast meine Frage beantwortet, damit du das Kopfgeld bekommst. Vielen Dank.
Swaranga Sarma
2

Ich bin mir nicht sicher, ob dies als Fehler qualifiziert ist. Dies ist das Verhalten, wenn die zusätzlichen Arbeitsthreads erstellt werden, nachdem die Warteschlange voll ist. In Java-Dokumenten wurde jedoch festgestellt, dass der Aufrufer mit abgelehnten Aufgaben umgehen muss.

Java-Dokumente

Fabrik für neue Fäden. Alle Threads werden mit dieser Factory erstellt (über die Methode addWorker). Alle Anrufer müssen darauf vorbereitet sein, dass addWorker fehlschlägt. Dies kann eine System- oder Benutzerrichtlinie widerspiegeln, die die Anzahl der Threads begrenzt. Auch wenn dies nicht als Fehler behandelt wird, kann das Versagen beim Erstellen von Threads dazu führen, dass neue Aufgaben abgelehnt werden oder vorhandene in der Warteschlange hängen bleiben.

Wenn Sie die Größe des Kernpools ändern, z. B. erhöhen, werden die zusätzlichen Worker erstellt ( addWorkerMethode in setCorePoolSize) und der Aufruf zum Erstellen zusätzlicher Work ( addWorkerMethode von execute) wird abgelehnt, wenn die addWorkerRückgabe false ( add Workerletzter Codeausschnitt) ist, da bereits genügend zusätzliche Worker vorhanden sind Erstellt von, setCorePoolSize aber noch nicht ausgeführt, um das Update in der Warteschlange widerzuspiegeln .

Relevante Teile

Vergleichen Sie

public void setCorePoolSize(int corePoolSize) {
    ....
    int k = Math.min(delta, workQueue.size());
    while (k-- > 0 && addWorker(null, true)) {
        if (workQueue.isEmpty())
             break;
    }
}

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
....
   if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
     return false;             
}

Verwenden Sie einen benutzerdefinierten Handler für die Ausführung von Wiederholungsversuchen (dies sollte für Ihren Fall funktionieren, da Sie die Obergrenze als maximale Poolgröße festgelegt haben). Bitte nach Bedarf anpassen.

public static class RetryRejectionPolicy implements RejectedExecutionHandler {
    public RetryRejectionPolicy () {}

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
           while(true)
            if(e.getQueue().offer(r)) break;
        }
    }
}

ThreadPoolExecutor pool = new ThreadPoolExecutor(
      minThreads, maxThreads,
      0, TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(queueCapacity),
      new ThreadPoolResizeTest.RetryRejectionPolicy()
 );

Beachten Sie auch, dass Ihre Verwendung des Herunterfahrens nicht korrekt ist, da dies nicht darauf wartet, dass die übermittelte Aufgabe ausgeführt wird, sondern awaitTerminationstattdessen mit verwendet wird.

Sagar Veeram
quelle
Ich denke, das Herunterfahren wartet laut JavaDoc auf bereits übermittelte Aufgaben: shutdown () Initiiert ein ordnungsgemäßes Herunterfahren, bei dem zuvor übermittelte Aufgaben ausgeführt werden, aber keine neuen Aufgaben akzeptiert werden.
Thomas Krieger
@ThomasKrieger - Die bereits übermittelten Aufgaben werden ausgeführt, aber nicht auf ihren Abschluss gewartet - von docs docs.oracle.com/javase/7/docs/api/java/util/concurrent/… - Diese Methode wartet nicht auf zuvor übermittelte Aufgaben Aufgaben, um die Ausführung abzuschließen. Verwenden Sie dazu awaitTermination.
Sagar Veeram