Ich suche nach einer ExecutorService- Implementierung, die mit einem Timeout versehen werden kann. Aufgaben, die an den ExecutorService gesendet werden, werden unterbrochen, wenn die Ausführung länger als das Zeitlimit dauert. Die Implementierung eines solchen Tieres ist keine so schwierige Aufgabe, aber ich frage mich, ob jemand von einer vorhandenen Implementierung weiß.
Folgendes habe ich mir aufgrund einiger der folgenden Diskussionen ausgedacht. Irgendwelche Kommentare?
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
timeoutExecutor.shutdown();
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
}
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
thread.interrupt();
}
}
}
java
multithreading
concurrency
executorservice
Edward Dale
quelle
quelle
protected void beforeExecute(Thread t, Runnable r)
Haken.Antworten:
Sie können hierfür einen ScheduledExecutorService verwenden . Zuerst würden Sie es nur einmal einreichen, um sofort zu beginnen und die erstellte Zukunft beizubehalten. Danach können Sie eine neue Aufgabe einreichen, die die beibehaltene Zukunft nach einiger Zeit abbricht.
Dadurch wird Ihr Handler (Hauptfunktionalität, die unterbrochen werden soll) 10 Sekunden lang ausgeführt und dann diese bestimmte Aufgabe abgebrochen (dh unterbrochen).
quelle
beforeExecute
Hakens.Leider ist die Lösung fehlerhaft. Es gibt eine Art Fehler
ScheduledThreadPoolExecutor
, der auch in dieser Frage gemeldet wird : Durch das Abbrechen einer übermittelten Aufgabe werden die mit der Aufgabe verbundenen Speicherressourcen nicht vollständig freigegeben. Die Ressourcen werden erst freigegeben, wenn die Aufgabe abläuft.Wenn Sie daher eine
TimeoutThreadPoolExecutor
mit einer ziemlich langen Ablaufzeit erstellen (eine typische Verwendung) und Aufgaben schnell genug senden, füllen Sie am Ende den Speicher - obwohl die Aufgaben tatsächlich erfolgreich abgeschlossen wurden.Sie können das Problem mit dem folgenden (sehr groben) Testprogramm sehen:
Das Programm erschöpft den verfügbaren Speicher, wartet jedoch darauf, dass die erzeugten
Runnable
s abgeschlossen sind.Ich habe eine Weile darüber nachgedacht, aber leider konnte ich keine gute Lösung finden.
BEARBEITEN: Ich habe herausgefunden, dass dieses Problem als JDK-Fehler 6602600 gemeldet wurde und anscheinend erst kürzlich behoben wurde.
quelle
Schließen Sie die Aufgabe in FutureTask ein, und Sie können ein Zeitlimit für die FutureTask festlegen. Schauen Sie sich das Beispiel in meiner Antwort auf diese Frage an:
Java Native Process Timeout
quelle
java.util.concurrent
Klassen zu tun , aber ich suche nach einerExecutorService
Implementierung.Nach einer Menge Zeit zum Vermessen
benutze ich schließlich die
invokeAll
MethodeExecutorService
, um dieses Problem zu lösen.Dadurch wird die Aufgabe während der Ausführung der Aufgabe strikt unterbrochen.
Hier ist ein Beispiel
Die Pro ist Ihnen auch gerne können
ListenableFuture
auf der gleichenExecutorService
.Ändern Sie die erste Codezeile nur geringfügig.
ListeningExecutorService
ist die Listening-Funktion vonExecutorService
bei Google Guava Project ( com.google.guava ))quelle
invokeAll
. Das funktioniert sehr gut. Nur ein Wort der Vorsicht für alle, die darüber nachdenken, dies zu verwenden: obwohlinvokeAll
eine Liste vonFuture
Objekten zurückgegeben wird, scheint es sich tatsächlich um eine Blockierungsoperation zu handeln.Wie wäre es mit der
ExecutorService.shutDownNow()
in http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html beschriebenen Methode ? Es scheint die einfachste Lösung zu sein.quelle
Es scheint, dass das Problem nicht im JDK-Fehler 6602600 (der am 22.05.2010 behoben wurde) liegt, sondern im falschen Aufruf des Schlafes (10) im Kreis. Hinzu kommt, dass der Haupt-Thread anderen Threads direkt CHANCE geben muss, um ihre Aufgaben zu realisieren, indem SLEEP (0) in JEDEM Zweig des äußeren Kreises aufgerufen wird. Ich denke, es ist besser, Thread.yield () anstelle von Thread.sleep (0) zu verwenden.
Der ergebniskorrigierte Teil des vorherigen Problemcodes lautet wie folgt:
Es funktioniert korrekt mit der Menge des äußeren Zählers bis zu 150 000 000 getesteten Kreisen.
quelle
Mit der Antwort von John W habe ich eine Implementierung erstellt, die das Zeitlimit korrekt beginnt, wenn die Ausführung der Aufgabe beginnt. Ich schreibe sogar einen Unit Test dafür :)
Es entspricht jedoch nicht meinen Anforderungen, da einige E / A-Vorgänge beim
Future.cancel()
Aufrufen (dh beimThread.interrupt()
Aufrufen) nicht unterbrochen werden . Einige Beispiele für E / A-Operationen, die beimThread.interrupt()
Aufruf möglicherweise nicht unterbrochen werden , sindSocket.connect
undSocket.read
(und ich vermute, dass die meisten E / A-Operationen in implementiert sindjava.io
). Alle E / A-Operationen injava.nio
sollten unterbrechbar sein, wenn sieThread.interrupt()
aufgerufen werden. Dies ist beispielsweise beiSocketChannel.open
und der FallSocketChannel.read
.Wenn jemand interessiert ist, habe ich eine Übersicht für einen Thread-Pool-Executor erstellt, mit der Aufgaben eine Zeitüberschreitung aufweisen können (wenn sie unterbrechbare Vorgänge verwenden ...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf
quelle
Socket.connect
undSocket.read
myThread.interrupted()
ist nicht die richtige Methode zum Unterbrechen, da das Unterbrechungsflag gelöscht wird. Verwenden SiemyThread.interrupt()
stattdessen, und das sollte mit SteckdosenThread.interrupted()
es nicht möglich ist, einen Thread zu unterbrechen. UnterbrichtThread.interrupt()
jedoch keinejava.io
Vorgänge, sondern funktioniert nur beijava.nio
Vorgängen.interrupt()
seit vielen Jahren verwendet und es hat immer java.io-Vorgänge unterbrochen (sowie andere Blockierungsmethoden wie Thread-Schlaf, JDBC-Verbindungen, Blockingqueue Take usw.). Vielleicht haben Sie eine Buggy-Klasse oder eine JVM gefunden, die Fehler aufweistWas ist mit dieser alternativen Idee:
Kleine Probe ist hier:
quelle
Überprüfen Sie, ob dies für Sie funktioniert.
Sie können die Anzahl der vom Scheduler verwendeten Threads einschränken und eine Zeitüberschreitung für die Aufgabe festlegen.
quelle