Java: ExecutorService, der bei Übermittlung nach einer bestimmten Warteschlangengröße blockiert

82

Ich versuche, eine Lösung zu codieren, bei der ein einzelner Thread E / A-intensive Aufgaben erzeugt, die parallel ausgeführt werden können. Jede Aufgabe verfügt über signifikante In-Memory-Daten. Daher möchte ich in der Lage sein, die Anzahl der Aufgaben zu begrenzen, die gerade anstehen.

Wenn ich ThreadPoolExecutor wie folgt erstelle:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

Dann die executor.submit(callable)Würfe, RejectedExecutionExceptionwenn die Warteschlange voll ist und alle Threads bereits beschäftigt sind.

Was kann ich tun, um zu executor.submit(callable)blockieren, wenn die Warteschlange voll ist und alle Threads beschäftigt sind?

EDIT : Ich habe versucht , dies :

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Und es erreicht etwas den Effekt, den ich erreichen möchte, aber auf unelegante Weise (grundsätzlich werden abgelehnte Threads im aufrufenden Thread ausgeführt, sodass der aufrufende Thread nicht mehr gesendet werden kann).

EDIT: (5 Jahre nach der Frage)

Wenn Sie diese Frage und ihre Antworten lesen, nehmen Sie die akzeptierte Antwort bitte nicht als eine richtige Lösung. Bitte lesen Sie alle Antworten und Kommentare durch.

Tahir Akhtar
quelle
1
Ich habe zuvor ein Semaphor verwendet, um genau das zu tun, genau wie in der Antwort auf die sehr ähnliche Frage, mit der @axtavt verknüpft ist.
Stephen Denne
2
@TomWolk Zum einen wird eine Aufgabe mehr parallel ausgeführt, als numWorkerThreadswenn der Aufruferthread auch eine Aufgabe ausführt. Das wichtigere Problem ist jedoch, dass die anderen Threads möglicherweise untätig bleiben und auf die nächste Aufgabe warten, wenn der aufrufende Thread eine lange laufende Aufgabe erhält.
Tahir Akhtar
2
@ TahirAkhtar, wahr; Die Warteschlange sollte ausreichend lang sein, damit sie nicht trocken läuft, wenn der Anrufer die Aufgabe selbst ausführen muss. Aber ich denke, es ist ein Vorteil, wenn ein weiterer Thread, der Aufrufer-Thread, zum Ausführen von Aufgaben verwendet werden kann. Wenn der Anrufer nur blockiert, ist der Thread des Anrufers inaktiv. Ich benutze CallerRunsPolicy mit einer Warteschlange, die dreimal so groß ist wie der Threadpool, und sie funktioniert gut und reibungslos. Im Vergleich zu dieser Lösung würde ich eine Temperierung mit dem Framework-Overengineering in Betracht ziehen.
TomWolk
1
@ TomWalk +1 Gute Punkte. Es scheint, als ob ein weiterer Unterschied darin besteht, dass, wenn die Aufgabe aus der Warteschlange abgelehnt und vom Aufruferthread ausgeführt wurde, der Anruferthread eine Anfrage in der falschen Reihenfolge verarbeitet, da er nicht darauf gewartet hat, dass sie in der Warteschlange an die Reihe kommt. Wenn Sie sich bereits für die Verwendung von Threads entschieden haben, müssen Sie alle Abhängigkeiten ordnungsgemäß behandeln, aber nur etwas, das Sie beachten sollten.
Rimsky

Antworten:

63

Ich habe das gleiche getan. Der Trick besteht darin, eine BlockingQueue zu erstellen, bei der die Angebotsmethode () wirklich eine put () ist. (Sie können jedes gewünschte Basis-BlockingQueue-Gerät verwenden).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

Beachten Sie, dass dies nur für den Thread-Pool funktioniert corePoolSize==maxPoolSize, wenn Sie dort vorsichtig sind (siehe Kommentare).

jtahlborn
quelle
2
Alternativ können Sie die SynchronousQueue erweitern, um eine Pufferung zu verhindern, und nur direkte Übergaben zulassen.
brendon
Elegant und spricht das Problem direkt an. Angebot () wird zu put (), und put () bedeutet "... gegebenenfalls warten, bis Platz verfügbar ist"
Trenton
5
Ich halte dies nicht für eine gute Idee, da dadurch das Protokoll der Angebotsmethode geändert wird. Die Angebotsmethode sollte ein nicht blockierender Anruf sein.
Mingjiang Shi
6
Ich bin anderer Meinung - dies ändert das Verhalten von ThreadPoolExecutor.execute so, dass die ThreadPoolExecutor-Logik bei einem corePoolSize <maxPoolSize niemals zusätzliche Worker über den Core hinaus hinzufügt.
Krease
5
Zur Verdeutlichung: Ihre Lösung funktioniert nur, solange Sie die Einschränkung wo beibehalten corePoolSize==maxPoolSize. Ohne das lässt ThreadPoolExecutor das entworfene Verhalten nicht mehr zu. Ich suchte nach einer Lösung für dieses Problem, die diese Einschränkung nicht hatte. Siehe meine alternative Antwort unten für den Ansatz, den wir letztendlich gewählt haben.
Krease
15

So habe ich das gelöst:

(Hinweis: Diese Lösung blockiert den Thread, der Callable sendet, und verhindert so, dass die RejectedExecutionException ausgelöst wird.)

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}
cvacca
quelle
1
Ich gehe davon aus, dass dies in Fällen, in denen corePoolSize < maxPoolSize...: |
Rogerdpack
1
Es funktioniert für den Fall, wo corePoolSize < maxPoolSize. In diesen Fällen ist das Semaphor verfügbar, aber es gibt keinen Thread, und das SynchronousQueuegibt false zurück. Der ThreadPoolExecutorwird dann einen neuen Thread spinnen. Das Problem dieser Lösung ist, dass sie eine Rennbedingung hat . Nach semaphore.release(), aber vor dem Ende des Threads executeerhält submit () die Semaphor-Erlaubnis. WENN die super.submit () , bevor der Lauf execute()beendet ist , wird der Auftrag abgelehnt.
Luís Guilherme
@ LuísGuilherme Aber semaphore.release () wird niemals aufgerufen, bevor der Thread die Ausführung beendet hat. Weil dieser Aufruf in der after Execute (...) Methode erfolgt. Vermisse ich etwas in dem Szenario, das Sie beschreiben?
Cvacca
1
afterExecute wird von demselben Thread aufgerufen, der die Aufgabe ausführt, sodass sie noch nicht abgeschlossen ist. Mach den Test selbst. Implementieren Sie diese Lösung und werfen Sie dem Executor eine Menge Arbeit zu, wenn die Arbeit abgelehnt wird. Sie werden feststellen, dass dies eine Rennbedingung ist und es nicht schwer ist, sie zu reproduzieren.
Luís Guilherme
1
Gehen Sie zu ThreadPoolExecutor und überprüfen Sie die Methode runWorker (Worker w). Sie werden sehen, dass Dinge nach Abschluss von AfterExecute passieren, einschließlich des Entsperrens des Workers und der Erhöhung der Anzahl abgeschlossener Aufgaben. Sie haben also zugelassen, dass Aufgaben eingehen (indem Sie das Semaphor freigeben), ohne über eine Bandbreite zu verfügen, um sie zu verarbeiten (indem Sie processWorkerExit aufgerufen haben).
Luís Guilherme
14

Die derzeit akzeptierte Antwort weist ein potenziell signifikantes Problem auf: Sie ändert das Verhalten von ThreadPoolExecutor.execute so, dass corePoolSize < maxPoolSizedie ThreadPoolExecutor-Logik bei Bedarf niemals zusätzliche Worker über den Kern hinaus hinzufügt.

Von ThreadPoolExecutor .execute (ausführbar):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

Insbesondere wird dieser letzte "else" -Block niemals getroffen.

Eine bessere Alternative besteht darin, etwas Ähnliches wie das zu tun, was OP bereits tut - verwenden Sie einen RejectedExecutionHandler , um dieselbe putLogik auszuführen:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

Bei diesem Ansatz sind einige Dinge zu beachten, wie in den Kommentaren (unter Bezugnahme auf diese Antwort ) ausgeführt:

  1. Wenn ja corePoolSize==0, gibt es eine Race-Bedingung, bei der alle Threads im Pool sterben können, bevor die Aufgabe sichtbar wird
  2. Die Verwendung einer Implementierung, die die Warteschlangenaufgaben umschließt (gilt nicht für ThreadPoolExecutor), führt zu Problemen, es sei denn, der Handler umschließt sie auf die gleiche Weise.

Unter Berücksichtigung dieser Fallstricke funktioniert diese Lösung für die meisten typischen ThreadPoolExecutors und behandelt den Fall, in dem corePoolSize < maxPoolSize.

Krease
quelle
Wer auch immer herabgestimmt hat - können Sie einen Einblick geben? Gibt es etwas Falsches / Irreführendes / Gefährliches in dieser Antwort? Ich möchte die Gelegenheit haben, auf Ihre Bedenken einzugehen.
Krease
2
Ich habe nicht abgelehnt, aber es scheint eine sehr schlechte Idee zu sein
vanOekel
@vanOekel - danke für den Link - diese Antwort wirft einige gültige Fälle auf, die bekannt sein sollten, wenn dieser Ansatz verwendet wird, aber IMO macht es nicht zu einer "sehr schlechten Idee" - es löst immer noch ein Problem, das in der aktuell akzeptierten Antwort vorhanden ist. Ich habe meine Antwort mit diesen Einschränkungen aktualisiert.
Krease
Wenn die Größe des Kernpools 0 ist und die Aufgabe an den Executor gesendet wird, beginnt der Executor mit dem Erstellen von Threads, wenn die Warteschlange voll ist, um die Aufgabe zu behandeln. Warum ist es dann anfällig für Deadlocks? Ich habe deinen Standpunkt nicht verstanden. Könnten Sie näher darauf eingehen?
Shirgill Farhan
@ ShirgillFarhanAnsari - das ist der Fall, der im vorherigen Kommentar angesprochen wurde. Dies kann passieren, weil das direkte Hinzufügen zur Warteschlange nicht zum Erstellen von Threads / Starten von Workern führt. Es ist eine
Randfall-
4

Ich weiß, dass dies eine alte Frage ist, hatte aber ein ähnliches Problem, dass das Erstellen neuer Aufgaben sehr schnell war und wenn zu viele OutOfMemoryError auftraten, weil vorhandene Aufgaben nicht schnell genug erledigt wurden.

In meinem Fall Callableswerden eingereicht und ich brauche das Ergebnis, daher muss ich alle Futureszurückgegebenen von speichern executor.submit(). Meine Lösung bestand darin, die Futuresin eine BlockingQueuemit maximaler Größe zu setzen. Sobald diese Warteschlange voll ist, werden keine Aufgaben mehr generiert, bis einige abgeschlossen sind (Elemente aus der Warteschlange entfernt). Im Pseudocode:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {   
    Thread taskGenerator = new Thread() {
        @Override
        public void run() {
            while (reader.hasNext) {
                Callable task = generateTask(reader.next());
                Future future = executor.submit(task);
                try {
                    // if queue is full blocks until a task
                    // is completed and hence no future tasks are submitted.
                    futures.put(compoundFuture);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();         
                }
            }
        executor.shutdown();
        }
    }
    taskGenerator.start();

    // read from queue as long as task are being generated
    // or while Queue has elements in it
    while (taskGenerator.isAlive()
                    || !futures.isEmpty()) {
        Future compoundFuture = futures.take();
        // do something
    }
} catch (InterruptedException ex) {
    Thread.currentThread().interrupt();     
} catch (ExecutionException ex) {
    throw new MyException(ex);
} finally {
    executor.shutdownNow();
}
Anfänger_
quelle
2

Ich hatte das ähnliche Problem und implementierte es mithilfe von beforeExecute/afterExecuteHooks von ThreadPoolExecutor:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Blocks current task execution if there is not enough resources for it.
 * Maximum task count usage controlled by maxTaskCount property.
 */
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

    private final ReentrantLock taskLock = new ReentrantLock();
    private final Condition unpaused = taskLock.newCondition();
    private final int maxTaskCount;

    private volatile int currentTaskCount;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, int maxTaskCount) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxTaskCount = maxTaskCount;
    }

    /**
     * Executes task if there is enough system resources for it. Otherwise
     * waits.
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taskLock.lock();
        try {
            // Spin while we will not have enough capacity for this job
            while (maxTaskCount < currentTaskCount) {
                try {
                    unpaused.await();
                } catch (InterruptedException e) {
                    t.interrupt();
                }
            }
            currentTaskCount++;
        } finally {
            taskLock.unlock();
        }
    }

    /**
     * Signalling that one more task is welcome
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        taskLock.lock();
        try {
            currentTaskCount--;
            unpaused.signalAll();
        } finally {
            taskLock.unlock();
        }
    }
}

Das sollte gut genug für dich sein. Übrigens war die ursprüngliche Implementierung auf Aufgabengröße basierend, da eine Aufgabe 100-mal größer sein konnte als eine andere, und das Senden von zwei großen Aufgaben die Box tötete, aber eine große und viele kleine Aufgaben auszuführen war in Ordnung. Wenn Ihre E / A-intensiven Aufgaben ungefähr dieselbe Größe haben, können Sie diese Klasse verwenden. Andernfalls lassen Sie es mich einfach wissen und ich werde eine größenbasierte Implementierung veröffentlichen.

PS Sie möchten ThreadPoolExecutorJavadoc überprüfen . Es ist eine wirklich nette Bedienungsanleitung von Doug Lea, wie es einfach angepasst werden kann.

Petro Semeniuk
quelle
1
Ich frage mich, was passieren wird, wenn ein Thread die Sperre vorExecute () hält und dies sieht maxTaskCount < currentTaskCountund unter der unpausedBedingung wartet . Gleichzeitig versucht ein anderer Thread, die Sperre in afterExecute () zu erlangen, um den Abschluss einer Aufgabe zu signalisieren. Wird es nicht ein Deadlock sein?
Tahir Akhtar
1
Ich habe auch festgestellt, dass diese Lösung den Thread nicht blockiert, der die Aufgaben sendet, wenn die Warteschlange voll ist. Ist RejectedExecutionExceptionalso noch möglich.
Tahir Akhtar
1
Die Semantik von ReentrantLock / Condition-Klassen ähnelt der von Synchronized & Wait / Notify. Wenn die Bedingungsmethoden aufgerufen werden, wird die Sperre aufgehoben, sodass kein Deadlock auftritt.
Petro Semeniuk
Richtig, dieser ExecutorService blockiert Aufgaben bei der Übermittlung, ohne den Aufruferthread zu blockieren. Der Auftrag wird gerade gesendet und asynchron verarbeitet, wenn genügend Systemressourcen dafür vorhanden sind.
Petro Semeniuk
2

Ich habe eine Lösung implementiert, die dem Dekorationsmuster folgt und ein Semaphor verwendet, um die Anzahl der ausgeführten Aufgaben zu steuern. Sie können es mit jedem verwenden Executorund:

  • Geben Sie das Maximum der laufenden Aufgaben an
  • Geben Sie das maximale Zeitlimit an, um auf eine Taskausführungserlaubnis zu warten (wenn das Zeitlimit abgelaufen ist und keine Erlaubnis erhalten wurde, RejectedExecutionExceptionwird a ausgelöst).
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

import javax.annotation.Nonnull;

public class BlockingOnFullQueueExecutorDecorator implements Executor {

    private static final class PermitReleasingDecorator implements Runnable {

        @Nonnull
        private final Runnable delegate;

        @Nonnull
        private final Semaphore semaphore;

        private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
            this.delegate = task;
            this.semaphore = semaphoreToRelease;
        }

        @Override
        public void run() {
            try {
                this.delegate.run();
            }
            finally {
                // however execution goes, release permit for next task
                this.semaphore.release();
            }
        }

        @Override
        public final String toString() {
            return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
        }
    }

    @Nonnull
    private final Semaphore taskLimit;

    @Nonnull
    private final Duration timeout;

    @Nonnull
    private final Executor delegate;

    public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) {
        this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
        if (maximumTaskNumber < 1) {
            throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
        }
        this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
        if (this.timeout.isNegative()) {
            throw new IllegalArgumentException("'maximumTimeout' must not be negative");
        }
        this.taskLimit = new Semaphore(maximumTaskNumber);
    }

    @Override
    public final void execute(final Runnable command) {
        Objects.requireNonNull(command, "'command' must not be null");
        try {
            // attempt to acquire permit for task execution
            if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
                throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
            }
        }
        catch (final InterruptedException e) {
            // restore interrupt status
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }

        this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit));
    }

    @Override
    public final String toString() {
        return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
                this.timeout, this.delegate);
    }
}
Grzegorz Lehmann
quelle
1

Ich denke, es ist so einfach wie a ArrayBlockingQueueanstelle von aa zu verwenden LinkedBlockingQueue.

Ignoriere mich ... das ist völlig falsch. ThreadPoolExecutorAnrufe Queue#offernicht, putdie den gewünschten Effekt haben würden.

Sie können diese Aufrufe anstelle von erweitern ThreadPoolExecutorund implementieren .execute(Runnable)putoffer

Ich fürchte, das scheint keine völlig zufriedenstellende Antwort zu sein.

Gareth Davis
quelle