ExecutorService, der Aufgaben nach einer Zeitüberschreitung unterbricht

93

Ich suche nach einer ExecutorService- Implementierung, die mit einem Timeout versehen werden kann. Aufgaben, die an den ExecutorService gesendet werden, werden unterbrochen, wenn die Ausführung länger als das Zeitlimit dauert. Die Implementierung eines solchen Tieres ist keine so schwierige Aufgabe, aber ich frage mich, ob jemand von einer vorhandenen Implementierung weiß.

Folgendes habe ich mir aufgrund einiger der folgenden Diskussionen ausgedacht. Irgendwelche Kommentare?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}
Edward Dale
quelle
Ist diese Startzeit des Timeouts der Zeitpunkt der Einreichung? Oder die Zeit, zu der die Aufgabe ausgeführt wird?
Tim Bender
Gute Frage. Wenn die Ausführung beginnt. Vermutlich mit dem protected void beforeExecute(Thread t, Runnable r)Haken.
Edward Dale
@ scompt.com verwenden Sie diese Lösung noch oder wurde sie ersetzt
Paul Taylor
@PaulTaylor Der Job, bei dem ich diese Lösung implementiert habe, wurde abgelöst. :-)
Edward Dale
Ich brauche genau dies, außer a) ich brauche meinen Hauptplanerdienst als Thread-Pool mit einem einzelnen Dienst-Thread, da meine Aufgaben streng gleichzeitig ausgeführt werden müssen und b) ich muss in der Lage sein, die Zeitüberschreitungsdauer für jede Aufgabe am anzugeben Zeitpunkt, zu dem die Aufgabe eingereicht wird. Ich habe versucht, dies als Ausgangspunkt zu verwenden, aber ScheduledThreadPoolExecutor zu erweitern, aber ich kann keine Möglichkeit finden, die angegebene Zeitüberschreitungsdauer, die zum Zeitpunkt der Aufgabenübermittlung angegeben werden soll, bis zur beforeExecute-Methode zu erhalten. Anregungen dankbar geschätzt!
Michael Ellis

Antworten:

89

Sie können hierfür einen ScheduledExecutorService verwenden . Zuerst würden Sie es nur einmal einreichen, um sofort zu beginnen und die erstellte Zukunft beizubehalten. Danach können Sie eine neue Aufgabe einreichen, die die beibehaltene Zukunft nach einiger Zeit abbricht.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

Dadurch wird Ihr Handler (Hauptfunktionalität, die unterbrochen werden soll) 10 Sekunden lang ausgeführt und dann diese bestimmte Aufgabe abgebrochen (dh unterbrochen).

John Vint
quelle
12
Interessante Idee, aber was ist, wenn die Aufgabe vor dem Timeout beendet wird (was normalerweise der Fall ist)? Ich möchte lieber nicht Tonnen von Bereinigungsaufgaben haben, die darauf warten, ausgeführt zu werden, nur um herauszufinden, dass die zugewiesene Aufgabe bereits abgeschlossen ist. Es müsste einen weiteren Thread geben, der die Futures überwacht, wenn sie fertig sind, um ihre Bereinigungsaufgaben zu entfernen.
Edward Dale
3
Der Testamentsvollstrecker plant diesen Abbruch nur einmal. Wenn die Aufgabe abgeschlossen ist, ist der Abbruch ein No-Op und die Arbeit wird unverändert fortgesetzt. Es muss nur ein zusätzliches Thread-Scheudling vorhanden sein, um die Aufgaben abzubrechen, und ein Thread, um sie auszuführen. Sie können zwei Ausführende haben, einen, um Ihre Hauptaufgaben einzureichen, und einen, um sie abzubrechen.
John Vint
3
Das stimmt, aber was ist, wenn das Timeout 5 Stunden beträgt und in dieser Zeit 10.000 Aufgaben ausgeführt werden? Ich möchte vermeiden, dass all diese No-Ops herumliegen und Speicherplatz beanspruchen und Kontextwechsel verursachen.
Edward Dale
1
@ Compt Nicht unbedingt. Es würde 10.000 Aufrufe von future.cancel () geben. Wenn jedoch die Zukunft abgeschlossen ist, wird der Abbruch schnell beendet und es werden keine unnötigen Arbeiten ausgeführt. Wenn Sie nicht möchten, dass 10.000 zusätzliche Aufrufe abgebrochen werden, funktioniert dies möglicherweise nicht, aber der Arbeitsaufwand für die Ausführung einer Aufgabe ist sehr gering.
John Vint
6
@ John W.: Ich habe gerade ein weiteres Problem mit Ihrer Implementierung erkannt. Ich benötige das Timeout, um zu beginnen, wenn die Ausführung der Aufgabe beginnt, wie ich zuvor kommentiert habe. Ich denke, der einzige Weg, dies zu tun, ist die Verwendung des beforeExecuteHakens.
Edward Dale
6

Leider ist die Lösung fehlerhaft. Es gibt eine Art Fehler ScheduledThreadPoolExecutor, der auch in dieser Frage gemeldet wird : Durch das Abbrechen einer übermittelten Aufgabe werden die mit der Aufgabe verbundenen Speicherressourcen nicht vollständig freigegeben. Die Ressourcen werden erst freigegeben, wenn die Aufgabe abläuft.

Wenn Sie daher eine TimeoutThreadPoolExecutormit einer ziemlich langen Ablaufzeit erstellen (eine typische Verwendung) und Aufgaben schnell genug senden, füllen Sie am Ende den Speicher - obwohl die Aufgaben tatsächlich erfolgreich abgeschlossen wurden.

Sie können das Problem mit dem folgenden (sehr groben) Testprogramm sehen:

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

Das Programm erschöpft den verfügbaren Speicher, wartet jedoch darauf, dass die erzeugten Runnables abgeschlossen sind.

Ich habe eine Weile darüber nachgedacht, aber leider konnte ich keine gute Lösung finden.

BEARBEITEN: Ich habe herausgefunden, dass dieses Problem als JDK-Fehler 6602600 gemeldet wurde und anscheinend erst kürzlich behoben wurde.

Flavio
quelle
4

Schließen Sie die Aufgabe in FutureTask ein, und Sie können ein Zeitlimit für die FutureTask festlegen. Schauen Sie sich das Beispiel in meiner Antwort auf diese Frage an:

Java Native Process Timeout

ZZ Coder
quelle
1
Mir ist klar, dass es einige Möglichkeiten gibt, dies mithilfe der java.util.concurrentKlassen zu tun , aber ich suche nach einer ExecutorServiceImplementierung.
Edward Dale
1
Wenn Sie sagen, dass Ihr ExecutorService die Tatsache verbergen soll, dass Zeitüberschreitungen aus dem Clientcode hinzugefügt werden, können Sie Ihren eigenen ExecutorService implementieren, der jede ihm übergebene ausführbare Datei mit einer FutureTask umschließt, bevor Sie sie ausführen.
Erikprice
2

Nach einer Menge Zeit zum Vermessen
benutze ich schließlich die invokeAllMethode ExecutorService, um dieses Problem zu lösen.
Dadurch wird die Aufgabe während der Ausführung der Aufgabe strikt unterbrochen.
Hier ist ein Beispiel

ExecutorService executorService = Executors.newCachedThreadPool();

try {
    List<Callable<Object>> callables = new ArrayList<>();
    // Add your long time task (callable)
    callables.add(new VaryLongTimeTask());
    // Assign tasks for specific execution timeout (e.g. 2 sec)
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
    for (Future<Object> future : futures) {
        // Getting result
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

executorService.shutdown();

Die Pro ist Ihnen auch gerne können ListenableFutureauf der gleichen ExecutorService.
Ändern Sie die erste Codezeile nur geringfügig.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorServiceist die Listening-Funktion von ExecutorServicebei Google Guava Project ( com.google.guava ))

Johnny
quelle
2
Vielen Dank für den Hinweis invokeAll. Das funktioniert sehr gut. Nur ein Wort der Vorsicht für alle, die darüber nachdenken, dies zu verwenden: obwohlinvokeAll eine Liste von FutureObjekten zurückgegeben wird, scheint es sich tatsächlich um eine Blockierungsoperation zu handeln.
MXRO
1

Es scheint, dass das Problem nicht im JDK-Fehler 6602600 (der am 22.05.2010 behoben wurde) liegt, sondern im falschen Aufruf des Schlafes (10) im Kreis. Hinzu kommt, dass der Haupt-Thread anderen Threads direkt CHANCE geben muss, um ihre Aufgaben zu realisieren, indem SLEEP (0) in JEDEM Zweig des äußeren Kreises aufgerufen wird. Ich denke, es ist besser, Thread.yield () anstelle von Thread.sleep (0) zu verwenden.

Der ergebniskorrigierte Teil des vorherigen Problemcodes lautet wie folgt:

.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

Es funktioniert korrekt mit der Menge des äußeren Zählers bis zu 150 000 000 getesteten Kreisen.

Sergey
quelle
1

Mit der Antwort von John W habe ich eine Implementierung erstellt, die das Zeitlimit korrekt beginnt, wenn die Ausführung der Aufgabe beginnt. Ich schreibe sogar einen Unit Test dafür :)

Es entspricht jedoch nicht meinen Anforderungen, da einige E / A-Vorgänge beim Future.cancel()Aufrufen (dh beim Thread.interrupt()Aufrufen) nicht unterbrochen werden . Einige Beispiele für E / A-Operationen, die beim Thread.interrupt()Aufruf möglicherweise nicht unterbrochen werden , sind Socket.connectund Socket.read(und ich vermute, dass die meisten E / A-Operationen in implementiert sind java.io). Alle E / A-Operationen in java.niosollten unterbrechbar sein, wenn sie Thread.interrupt()aufgerufen werden. Dies ist beispielsweise bei SocketChannel.openund der Fall SocketChannel.read.

Wenn jemand interessiert ist, habe ich eine Übersicht für einen Thread-Pool-Executor erstellt, mit der Aufgaben eine Zeitüberschreitung aufweisen können (wenn sie unterbrechbare Vorgänge verwenden ...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

Amanteaux
quelle
Interessanter Code, ich habe ihn in mein System gezogen und bin gespannt, ob Sie einige Beispiele dafür haben, welche Art von E / A-Vorgängen nicht unterbrochen werden, damit ich sehen kann, ob sie sich auf mein System auswirken. Vielen Dank!
Duncan Krebs
@ DuncanKrebs Ich habe meine Antwort mit einem Beispiel für nicht unterbrechbare E / A detailliert beschrieben: Socket.connectundSocket.read
amanteaux
myThread.interrupted()ist nicht die richtige Methode zum Unterbrechen, da das Unterbrechungsflag gelöscht wird. Verwenden Sie myThread.interrupt()stattdessen, und das sollte mit Steckdosen
DanielCuadra
@ DanielCuadra: Danke, es sieht so aus, als hätte ich einen Tippfehler gemacht, da Thread.interrupted()es nicht möglich ist, einen Thread zu unterbrechen. Unterbricht Thread.interrupt()jedoch keine java.ioVorgänge, sondern funktioniert nur bei java.nioVorgängen.
Amanteaux
Ich habe es interrupt()seit vielen Jahren verwendet und es hat immer java.io-Vorgänge unterbrochen (sowie andere Blockierungsmethoden wie Thread-Schlaf, JDBC-Verbindungen, Blockingqueue Take usw.). Vielleicht haben Sie eine Buggy-Klasse oder eine JVM gefunden, die Fehler aufweist
DanielCuadra
0

Was ist mit dieser alternativen Idee:

  • zwei haben zwei Testamentsvollstrecker:
    • eins für :
      • Senden der Aufgabe, ohne sich um das Zeitlimit der Aufgabe zu kümmern
      • Hinzufügen der Zukunft resultiert und der Zeitpunkt, zu dem es zu einer internen Struktur enden sollte
    • eine zum Ausführen eines internen Jobs, der die interne Struktur überprüft, wenn einige Aufgaben eine Zeitüberschreitung aufweisen und abgebrochen werden müssen.

Kleine Probe ist hier:

public class AlternativeExecutorService 
{

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue       = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor                scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService                   threadExecutor    = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;

public AlternativeExecutorService()
{
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}

public void pushTask(OwnTask task)
{
    ListenableFuture<Void> future = threadExecutor.submit(task);  // -> create your Callable
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}

public void shutdownInternalScheduledExecutor()
{
    scheduledFuture.cancel(true);
    scheduledExecutor.shutdownNow();
}

long getCurrentMillisecondsTime()
{
    return Calendar.getInstance().get(Calendar.MILLISECOND);
}

class ListenableFutureTask
{
    private final ListenableFuture<Void> future;
    private final OwnTask                task;
    private final long                   milliSecEndTime;

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
    {
        this.future = future;
        this.task = task;
        this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
    }

    ListenableFuture<Void> getFuture()
    {
        return future;
    }

    OwnTask getTask()
    {
        return task;
    }

    long getMilliSecEndTime()
    {
        return milliSecEndTime;
    }
}

class TimeoutManagerJob implements Runnable
{
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
    {
        return futureQueue;
    }

    @Override
    public void run()
    {
        long currentMileSecValue = getCurrentMillisecondsTime();
        for (ListenableFutureTask futureTask : futureQueue)
        {
            consumeFuture(futureTask, currentMileSecValue);
        }
    }

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
    {
        ListenableFuture<Void> future = futureTask.getFuture();
        boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
        if (isTimeout)
        {
            if (!future.isDone())
            {
                future.cancel(true);
            }
            futureQueue.remove(futureTask);
        }
    }
}

class OwnTask implements Callable<Void>
{
    private long     timeoutDuration;
    private TimeUnit timeUnit;

    OwnTask(long timeoutDuration, TimeUnit timeUnit)
    {
        this.timeoutDuration = timeoutDuration;
        this.timeUnit = timeUnit;
    }

    @Override
    public Void call() throws Exception
    {
        // do logic
        return null;
    }

    public long getTimeoutDuration()
    {
        return timeoutDuration;
    }

    public TimeUnit getTimeUnit()
    {
        return timeUnit;
    }
}
}
Ionut Mesaros
quelle
0

Überprüfen Sie, ob dies für Sie funktioniert.

    public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor,
      int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection,
      Map<K,V> context, Task<T,S,K,V> someTask){
    if(threadPoolExecutor==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build();
    }
    if(someTask==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build();
    }
    if(CollectionUtils.isEmpty(collection)){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build();
    }

    LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size());
    collection.forEach(value -> {
      callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything.
    });
    LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>();

    int count = 0;

    while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){
      Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll());
      futures.offer(f);
      count++;
    }

    Collection<ResponseObject<T>> responseCollection = new ArrayList<>();

    while(futures.size()>0){
      Future<T> future = futures.poll();
      ResponseObject<T> responseObject = null;
        try {
          T response = future.get(timeToCompleteEachTask, timeUnit);
          responseObject = ResponseObject.<T>builder().data(response).build();
        } catch (InterruptedException e) {
          future.cancel(true);
        } catch (ExecutionException e) {
          future.cancel(true);
        } catch (TimeoutException e) {
          future.cancel(true);
        } finally {
          if (Objects.nonNull(responseObject)) {
            responseCollection.add(responseObject);
          }
          futures.remove(future);//remove this
          Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue);
          if(null!=callable){
            Future<T> f = threadPoolExecutor.submit(callable);
            futures.add(f);
          }
        }

    }
    return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build();
  }

  private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){
    if(callableLinkedBlockingQueue.size()>0){
      return callableLinkedBlockingQueue.poll();
    }
    return null;
  }

Sie können die Anzahl der vom Scheduler verwendeten Threads einschränken und eine Zeitüberschreitung für die Aufgabe festlegen.

Nitish Kumar
quelle