Ich versuche, eine Lösung zu codieren, bei der ein einzelner Thread E / A-intensive Aufgaben erzeugt, die parallel ausgeführt werden können. Jede Aufgabe verfügt über signifikante In-Memory-Daten. Daher möchte ich in der Lage sein, die Anzahl der Aufgaben zu begrenzen, die gerade anstehen.
Wenn ich ThreadPoolExecutor wie folgt erstelle:
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(maxQueue));
Dann die executor.submit(callable)
Würfe, RejectedExecutionException
wenn die Warteschlange voll ist und alle Threads bereits beschäftigt sind.
Was kann ich tun, um zu executor.submit(callable)
blockieren, wenn die Warteschlange voll ist und alle Threads beschäftigt sind?
EDIT : Ich habe versucht , dies :
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
Und es erreicht etwas den Effekt, den ich erreichen möchte, aber auf unelegante Weise (grundsätzlich werden abgelehnte Threads im aufrufenden Thread ausgeführt, sodass der aufrufende Thread nicht mehr gesendet werden kann).
EDIT: (5 Jahre nach der Frage)
Wenn Sie diese Frage und ihre Antworten lesen, nehmen Sie die akzeptierte Antwort bitte nicht als eine richtige Lösung. Bitte lesen Sie alle Antworten und Kommentare durch.
quelle
numWorkerThreads
wenn der Aufruferthread auch eine Aufgabe ausführt. Das wichtigere Problem ist jedoch, dass die anderen Threads möglicherweise untätig bleiben und auf die nächste Aufgabe warten, wenn der aufrufende Thread eine lange laufende Aufgabe erhält.Antworten:
Ich habe das gleiche getan. Der Trick besteht darin, eine BlockingQueue zu erstellen, bei der die Angebotsmethode () wirklich eine put () ist. (Sie können jedes gewünschte Basis-BlockingQueue-Gerät verwenden).
public class LimitedQueue<E> extends LinkedBlockingQueue<E> { public LimitedQueue(int maxSize) { super(maxSize); } @Override public boolean offer(E e) { // turn offer() and add() into a blocking calls (unless interrupted) try { put(e); return true; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return false; } }
Beachten Sie, dass dies nur für den Thread-Pool funktioniert
corePoolSize==maxPoolSize
, wenn Sie dort vorsichtig sind (siehe Kommentare).quelle
corePoolSize==maxPoolSize
. Ohne das lässt ThreadPoolExecutor das entworfene Verhalten nicht mehr zu. Ich suchte nach einer Lösung für dieses Problem, die diese Einschränkung nicht hatte. Siehe meine alternative Antwort unten für den Ansatz, den wir letztendlich gewählt haben.So habe ich das gelöst:
(Hinweis: Diese Lösung blockiert den Thread, der Callable sendet, und verhindert so, dass die RejectedExecutionException ausgelöst wird.)
public class BoundedExecutor extends ThreadPoolExecutor{ private final Semaphore semaphore; public BoundedExecutor(int bound) { super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); semaphore = new Semaphore(bound); } /**Submits task to execution pool, but blocks while number of running threads * has reached the bound limit */ public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{ semaphore.acquire(); return submit(task); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); semaphore.release(); } }
quelle
corePoolSize < maxPoolSize
...: |corePoolSize < maxPoolSize
. In diesen Fällen ist das Semaphor verfügbar, aber es gibt keinen Thread, und dasSynchronousQueue
gibt false zurück. DerThreadPoolExecutor
wird dann einen neuen Thread spinnen. Das Problem dieser Lösung ist, dass sie eine Rennbedingung hat . Nachsemaphore.release()
, aber vor dem Ende des Threadsexecute
erhält submit () die Semaphor-Erlaubnis. WENN die super.submit () , bevor der Laufexecute()
beendet ist , wird der Auftrag abgelehnt.Die derzeit akzeptierte Antwort weist ein potenziell signifikantes Problem auf: Sie ändert das Verhalten von ThreadPoolExecutor.execute so, dass
corePoolSize < maxPoolSize
die ThreadPoolExecutor-Logik bei Bedarf niemals zusätzliche Worker über den Kern hinaus hinzufügt.Von ThreadPoolExecutor .execute (ausführbar):
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);
Insbesondere wird dieser letzte "else" -Block niemals getroffen.
Eine bessere Alternative besteht darin, etwas Ähnliches wie das zu tun, was OP bereits tut - verwenden Sie einen RejectedExecutionHandler , um dieselbe
put
Logik auszuführen:public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e); } }
Bei diesem Ansatz sind einige Dinge zu beachten, wie in den Kommentaren (unter Bezugnahme auf diese Antwort ) ausgeführt:
corePoolSize==0
, gibt es eine Race-Bedingung, bei der alle Threads im Pool sterben können, bevor die Aufgabe sichtbar wirdThreadPoolExecutor
), führt zu Problemen, es sei denn, der Handler umschließt sie auf die gleiche Weise.Unter Berücksichtigung dieser Fallstricke funktioniert diese Lösung für die meisten typischen ThreadPoolExecutors und behandelt den Fall, in dem
corePoolSize < maxPoolSize
.quelle
Ich weiß, dass dies eine alte Frage ist, hatte aber ein ähnliches Problem, dass das Erstellen neuer Aufgaben sehr schnell war und wenn zu viele OutOfMemoryError auftraten, weil vorhandene Aufgaben nicht schnell genug erledigt wurden.
In meinem Fall
Callables
werden eingereicht und ich brauche das Ergebnis, daher muss ich alleFutures
zurückgegebenen von speichernexecutor.submit()
. Meine Lösung bestand darin, dieFutures
in eineBlockingQueue
mit maximaler Größe zu setzen. Sobald diese Warteschlange voll ist, werden keine Aufgaben mehr generiert, bis einige abgeschlossen sind (Elemente aus der Warteschlange entfernt). Im Pseudocode:final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads); final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize); try { Thread taskGenerator = new Thread() { @Override public void run() { while (reader.hasNext) { Callable task = generateTask(reader.next()); Future future = executor.submit(task); try { // if queue is full blocks until a task // is completed and hence no future tasks are submitted. futures.put(compoundFuture); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } executor.shutdown(); } } taskGenerator.start(); // read from queue as long as task are being generated // or while Queue has elements in it while (taskGenerator.isAlive() || !futures.isEmpty()) { Future compoundFuture = futures.take(); // do something } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } catch (ExecutionException ex) { throw new MyException(ex); } finally { executor.shutdownNow(); }
quelle
Ich hatte das ähnliche Problem und implementierte es mithilfe von
beforeExecute/afterExecute
Hooks vonThreadPoolExecutor
:import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Blocks current task execution if there is not enough resources for it. * Maximum task count usage controlled by maxTaskCount property. */ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final ReentrantLock taskLock = new ReentrantLock(); private final Condition unpaused = taskLock.newCondition(); private final int maxTaskCount; private volatile int currentTaskCount; public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int maxTaskCount) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.maxTaskCount = maxTaskCount; } /** * Executes task if there is enough system resources for it. Otherwise * waits. */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); taskLock.lock(); try { // Spin while we will not have enough capacity for this job while (maxTaskCount < currentTaskCount) { try { unpaused.await(); } catch (InterruptedException e) { t.interrupt(); } } currentTaskCount++; } finally { taskLock.unlock(); } } /** * Signalling that one more task is welcome */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); taskLock.lock(); try { currentTaskCount--; unpaused.signalAll(); } finally { taskLock.unlock(); } } }
Das sollte gut genug für dich sein. Übrigens war die ursprüngliche Implementierung auf Aufgabengröße basierend, da eine Aufgabe 100-mal größer sein konnte als eine andere, und das Senden von zwei großen Aufgaben die Box tötete, aber eine große und viele kleine Aufgaben auszuführen war in Ordnung. Wenn Ihre E / A-intensiven Aufgaben ungefähr dieselbe Größe haben, können Sie diese Klasse verwenden. Andernfalls lassen Sie es mich einfach wissen und ich werde eine größenbasierte Implementierung veröffentlichen.
PS Sie möchten
ThreadPoolExecutor
Javadoc überprüfen . Es ist eine wirklich nette Bedienungsanleitung von Doug Lea, wie es einfach angepasst werden kann.quelle
maxTaskCount < currentTaskCount
und unter derunpaused
Bedingung wartet . Gleichzeitig versucht ein anderer Thread, die Sperre in afterExecute () zu erlangen, um den Abschluss einer Aufgabe zu signalisieren. Wird es nicht ein Deadlock sein?RejectedExecutionException
also noch möglich.Ich habe eine Lösung implementiert, die dem Dekorationsmuster folgt und ein Semaphor verwendet, um die Anzahl der ausgeführten Aufgaben zu steuern. Sie können es mit jedem verwenden
Executor
und:RejectedExecutionException
wird a ausgelöst).import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.time.Duration; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import javax.annotation.Nonnull; public class BlockingOnFullQueueExecutorDecorator implements Executor { private static final class PermitReleasingDecorator implements Runnable { @Nonnull private final Runnable delegate; @Nonnull private final Semaphore semaphore; private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) { this.delegate = task; this.semaphore = semaphoreToRelease; } @Override public void run() { try { this.delegate.run(); } finally { // however execution goes, release permit for next task this.semaphore.release(); } } @Override public final String toString() { return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate); } } @Nonnull private final Semaphore taskLimit; @Nonnull private final Duration timeout; @Nonnull private final Executor delegate; public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) { this.delegate = Objects.requireNonNull(executor, "'executor' must not be null"); if (maximumTaskNumber < 1) { throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber)); } this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null"); if (this.timeout.isNegative()) { throw new IllegalArgumentException("'maximumTimeout' must not be negative"); } this.taskLimit = new Semaphore(maximumTaskNumber); } @Override public final void execute(final Runnable command) { Objects.requireNonNull(command, "'command' must not be null"); try { // attempt to acquire permit for task execution if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate)); } } catch (final InterruptedException e) { // restore interrupt status Thread.currentThread().interrupt(); throw new IllegalStateException(e); } this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit)); } @Override public final String toString() { return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(), this.timeout, this.delegate); } }
quelle
Ich denke, es ist so einfach wie a
ArrayBlockingQueue
anstelle von aa zu verwendenLinkedBlockingQueue
.Ignoriere mich ... das ist völlig falsch.
ThreadPoolExecutor
AnrufeQueue#offer
nicht,put
die den gewünschten Effekt haben würden.Sie können diese Aufrufe anstelle von erweitern
ThreadPoolExecutor
und implementieren .execute(Runnable)
put
offer
Ich fürchte, das scheint keine völlig zufriedenstellende Antwort zu sein.
quelle