Behandeln von Ausnahmen von Java ExecutorService-Aufgaben

213

Ich versuche Java zu verwenden ThreadPoolExecutor Klasse zu verwenden, um eine große Anzahl schwerer Aufgaben mit einer festen Anzahl von Threads auszuführen. Jede der Aufgaben hat viele Stellen, an denen sie aufgrund von Ausnahmen fehlschlagen kann.

Ich habe eine Unterklasse erstellt ThreadPoolExecutorund die afterExecuteMethode überschrieben , die alle nicht erfassten Ausnahmen bereitstellen soll, die beim Ausführen einer Aufgabe auftreten. Ich kann es jedoch nicht zum Laufen bringen.

Beispielsweise:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

Die Ausgabe dieses Programms lautet "Alles ist in Ordnung - Situation normal!" obwohl die einzige Runnable, die an den Thread-Pool gesendet wurde, eine Ausnahme auslöst. Irgendwelche Hinweise darauf, was hier los ist?

Vielen Dank!

Tom
quelle
Sie haben nie die Zukunft der Aufgabe abgefragt, was dort passiert ist. Der gesamte Service Executor oder das gesamte Programm wird nicht abstürzen. Die Ausnahme wird abgefangen und unter ExecutionException eingeschlossen. Und wird er erneut geworfen, wenn Sie future.get () aufrufen? PS: Die future.isDone () [Bitte lesen Sie den echten API-Namen] gibt true zurück, auch wenn die ausführbare Datei fehlerhaft beendet wurde. Weil die Aufgabe wirklich erledigt ist.
Jai Pandit

Antworten:

156

Aus den Dokumenten :

Hinweis: Wenn Aktionen in Aufgaben (z. B. FutureTask) entweder explizit oder über Methoden wie "submit" eingeschlossen sind, erfassen und verwalten diese Aufgabenobjekte rechnerische Ausnahmen, sodass sie nicht abrupt beendet werden und die internen Ausnahmen nicht an diese Methode übergeben werden .

Wenn Sie ein Runnable einreichen, wird es in eine Zukunft verpackt.

Ihr afterExecute sollte ungefähr so ​​aussehen:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}
nr
quelle
7
Vielen Dank, ich habe diese Lösung verwendet. Falls jemand interessiert ist: Andere haben vorgeschlagen, den ExecutorService nicht zu unterklassifizieren, aber ich habe es trotzdem getan, weil ich Aufgaben überwachen wollte, während sie abgeschlossen sind, anstatt darauf zu warten, dass alle beendet werden, und dann get () für alle zurückgegebenen Futures aufzurufen .
Tom
1
Ein anderer Ansatz zur Unterklasse des Executors besteht darin, FutureTask zu unterklassifizieren und seine 'done'-Methode zu überschreiben
Nr.
1
Tom >> Können Sie bitte Ihren Beispiel-Snippet-Code veröffentlichen, in dem Sie ExecutorService unterklassifiziert haben, um Aufgaben zu überwachen, wenn sie abgeschlossen sind ...
Jagamot
1
Diese Antwort funktioniert nicht, wenn Sie ComplableFuture.runAsync verwenden, da afterExecute ein Objekt enthält, das paketprivat ist und nicht auf das Throwable zugreifen kann. Ich habe es umgangen, indem ich den Anruf abgeschlossen habe. Siehe meine Antwort unten.
mmm
2
Müssen wir überprüfen, ob die Zukunft mit abgeschlossen ist future.isDone()? Da afterExecutees ausgeführt wird, nachdem das Runnableabgeschlossen ist, gehe ich davon aus, dass es future.isDone()immer zurückkehrt true.
Searene
248

WARNUNG : Es ist zu beachten, dass diese Lösung den aufrufenden Thread blockiert.


Wenn Sie von der Aufgabe ausgelöste Ausnahmen verarbeiten möchten, ist es im Allgemeinen besser, diese zu verwenden Callableals Runnable.

Callable.call() Es ist erlaubt, geprüfte Ausnahmen auszulösen, und diese werden zurück an den aufrufenden Thread weitergegeben:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

Wenn Callable.call()eine Ausnahme ExecutionExceptionausgelöst wird , wird diese in eine eingeschlossen und von ausgelöst Future.get().

Dies ist wahrscheinlich einer Unterklasse vorzuziehen ThreadPoolExecutor. Sie haben auch die Möglichkeit, die Aufgabe erneut zu senden, wenn es sich um eine wiederherstellbare Ausnahme handelt.

Skaffman
quelle
5
> Callable.call () darf geprüfte Ausnahmen auslösen, und diese werden an den aufrufenden Thread zurückgegeben: Beachten Sie, dass die ausgelöste Ausnahme nur dann an den aufrufenden Thread weitergegeben wird, wenn future.get()oder dessen überladene Version aufgerufen wird.
nhylated
16
Es ist perfekt, aber was tun, wenn ich Aufgaben parallel ausführe und die Ausführung nicht blockieren möchte?
Grigory Kislin
43
Verwenden Sie diese Lösung nicht, da sie den gesamten Zweck der Verwendung des ExecutorService verletzt. Ein ExecutorService ist ein asynchroner Ausführungsmechanismus, mit dem Aufgaben im Hintergrund ausgeführt werden können. Wenn Sie future.get () direkt nach der Ausführung aufrufen, wird der aufrufende Thread blockiert, bis die Aufgabe abgeschlossen ist.
user1801374
2
Diese Lösung sollte nicht so hoch bewertet sein. Future.get () arbeitet synchron und fungiert als Blocker, bis Runnable oder Callable ausgeführt wurden. Wie oben angegeben, wird der Zweck der Verwendung des Executor Service
Super Hans
2
Wie #nhylated hervorhob, verdient dies einen JDK-BUG. Wenn Future.get () nicht aufgerufen wird, wird jede nicht erfasste Ausnahme von Callable stillschweigend ignoriert. Sehr schlechtes Design ... habe gerade mehr als einen Tag damit verbracht, eine Bibliothek zu finden, die dies verwendet, und JDK hat Ausnahmen stillschweigend ignoriert. Und das gibt es immer noch in jdk12.
Ben Jiang
18

Die Erklärung für dieses Verhalten finden Sie im Javadoc für afterExecute :

Hinweis: Wenn Aktionen entweder explizit oder über Methoden wie submit in Aufgaben (z. B. FutureTask) eingeschlossen sind, erfassen und verwalten diese Aufgabenobjekte rechnerische Ausnahmen, sodass sie nicht abrupt beendet werden und die internen Ausnahmen nicht an diese Methode übergeben werden .

Drew Wills
quelle
10

Ich habe es umgangen, indem ich die mitgelieferte ausführbare Datei verpackt habe, die dem Testamentsvollstrecker übermittelt wurde.

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);
mmm
quelle
3
Sie können die Lesbarkeit mithilfe der whenComplete()Methode von verbessern CompletableFuture.
Eduard Wirch
@EduardWirch das funktioniert, aber Sie können keine Ausnahme von whenComplete ()
Akshat
7

Ich verwende eine VerboseRunnableKlasse aus jcabi-log , die alle Ausnahmen verschluckt und protokolliert. Sehr praktisch zum Beispiel:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);
yegor256
quelle
3

Eine andere Lösung wäre die Verwendung von ManagedTask und ManagedTaskListener .

Sie benötigen ein Callable oder Runnable, das die Schnittstelle ManagedTask implementiert .

Die Methode getManagedTaskListenergibt die gewünschte Instanz zurück.

public ManagedTaskListener getManagedTaskListener() {

Und Sie implementieren in ManagedTaskListener die taskDoneMethode:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

Weitere Details zum Lebenszyklus verwalteter Aufgaben und zum Listener .

CSchulz
quelle
2

Das funktioniert

  • Es ist von SingleThreadExecutor abgeleitet, kann jedoch problemlos angepasst werden
  • Java 8 Lamdas Code, aber einfach zu beheben

Es wird ein Executor mit einem einzelnen Thread erstellt, der viele Aufgaben ausführen kann. und wartet, bis die aktuelle Ausführung beendet ist, um mit der nächsten zu beginnen

Im Falle eines Uncaugth-Fehlers oder einer Ausnahme der uncaughtExceptionHandler ihn abfangen

öffentliche Abschlussklasse SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions (endgültiger Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        ThreadFactory factory = (Runnable runnable) -> {
            final Thread newThread = neuer Thread (ausführbar "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler ((letzter Thread caugthThread, letzter Throwable Throwable) -> {
                uncaughtExceptionHandler.uncaughtException (caugthThread, throwable);
            });
            return newThread;
        };
        Geben Sie den neuen FinalizableDelegatedExecutorService zurück
                (neuer ThreadPoolExecutor (1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        neue LinkedBlockingQueue (),
                        Fabrik){


                    protected void afterExecute (Runnable runnable, Throwable throwable) {
                        super.afterExecute (lauffähig, wirfbar);
                        if (throwable == null && ausführbare Instanz von Future) {
                            Versuchen {
                                Zukunft Zukunft = (Zukunft) lauffähig;
                                if (future.isDone ()) {
                                    future.get ();
                                }}
                            } catch (CancellationException ce) {
                                Wurf = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause ();
                            } catch (InterruptedException dh) {
                                Thread.currentThread (). Interrupt (); // ignorieren / zurücksetzen
                            }}
                        }}
                        if (throwable! = null) {
                            uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), throwable);
                        }}
                    }}
                });
    }}



    private statische Klasse FinalizableDelegatedExecutorService
            erweitert DelegatedExecutorService {
        FinalizableDelegatedExecutorService (ExecutorService-Executor) {
            super (Testamentsvollstrecker);
        }}
        protected void finalize () {
            super.shutdown ();
        }}
    }}

    / **
     * Eine Wrapper-Klasse, die nur die ExecutorService-Methoden verfügbar macht
     * einer ExecutorService-Implementierung.
     * /
    private statische Klasse DelegatedExecutorService erweitert AbstractExecutorService {
        privater endgültiger ExecutorService e;
        DelegatedExecutorService (ExecutorService-Executor) {e = Executor; }}
        public void execute (ausführbarer Befehl) {e.execute (Befehl); }}
        public void shutdown () {e.shutdown (); }}
        öffentliche Liste shutdownNow () {return e.shutdownNow (); }}
        public boolean isShutdown () {return e.isShutdown (); }}
        public boolean isTerminated () {return e.isTerminated (); }}
        public boolean awaitTermination (lange Zeitüberschreitung, TimeUnit-Einheit)
                löst InterruptedException aus {
            return e.awaitTermination (Timeout, Einheit);
        }}
        public Future submit (ausführbare Aufgabe) {
            return e.submit (Aufgabe);
        }}
        public Future submit (aufrufbare Aufgabe) {
            return e.submit (Aufgabe);
        }}
        public Future submit (ausführbare Aufgabe, T-Ergebnis) {
            return e.submit (Aufgabe, Ergebnis);
        }}
        öffentliche Liste> invokeAll (Sammlung> Aufgaben)
                löst InterruptedException aus {
            return e.invokeAll (Aufgaben);
        }}
        öffentliche Liste> invokeAll (Sammlung> Aufgaben,
                                             lange Zeitüberschreitung, TimeUnit-Einheit)
                löst InterruptedException aus {
            return e.invokeAll (Aufgaben, Zeitüberschreitung, Einheit);
        }}
        public T invokeAny (Sammlung> Aufgaben)
                löst InterruptedException, ExecutionException {aus
            return e.invokeAny (Aufgaben);
        }}
        public T invokeAny (Sammlung> Aufgaben,
                               lange Zeitüberschreitung, TimeUnit-Einheit)
                löst InterruptedException, ExecutionException, TimeoutException aus {
            return e.invokeAny (Aufgaben, Zeitüberschreitung, Einheit);
        }}
    }}



    private SingleThreadExecutorWithExceptions () {}
}}
obesga_tirant
quelle
Die Verwendung von finalize ist leider etwas instabil, da sie erst "später, wenn der Garbage Collector sie sammelt" aufgerufen wird (oder vielleicht nicht im Fall eines Threads, keine Ahnung) ...
Rogerdpack
1

Wenn Sie die Ausführung einer Aufgabe überwachen möchten, können Sie 1 oder 2 Threads (je nach Auslastung möglicherweise mehr) drehen und damit Aufgaben aus einem ExecutionCompletionService-Wrapper übernehmen.

Cristian Botiza
quelle
0

Wenn Sie ExecutorServicevon einer externen Quelle stammen (dh es ist nicht möglich, Unterklassen zu erstellen ThreadPoolExecutorund zu überschreiben afterExecute()), können Sie einen dynamischen Proxy verwenden, um das gewünschte Verhalten zu erzielen:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}
Bass
quelle
0

Dies liegt daran, dass AbstractExecutorService :: submitSie wie unten runnablein RunnableFuture(nichts als FutureTask) eingewickelt werden

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Dann executewird es weitergeleitet Workerund Worker.run()wird das Folgende aufrufen.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Schließlich wird task.run();im obigen Code der Aufruf aufgerufen FutureTask.run(). Hier ist der Code für die Ausnahmebehandlungsroutine. Aus diesem Grund erhalten Sie NICHT die erwartete Ausnahme.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
Kanagavelu Sugumar
quelle
0

Dies ähnelt der Lösung von mmm, ist jedoch etwas verständlicher. Lassen Sie Ihre Aufgaben eine abstrakte Klasse erweitern, die die run () -Methode umschließt.

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}
ccleve
quelle
-4

Anstatt ThreadPoolExecutor zu unterklassifizieren, würde ich eine ThreadFactory- Instanz bereitstellen , die neue Threads erstellt und ihnen einen UncaughtExceptionHandler zur Verfügung stellt

Kevin
quelle
3
Ich habe es auch versucht, aber die Methode uncaughtException scheint nie aufgerufen zu werden. Ich glaube, das liegt daran, dass ein Arbeitsthread in der ThreadPoolExecutor-Klasse die Ausnahmen abfängt.
Tom
5
Die uncaughtException-Methode wird nicht aufgerufen, da die Submit-Methode des ExecutorService die Callable / Runnable in einer Zukunft umschließt. Die Ausnahme wird dort erfasst.
Emil Sit