Fangen Sie die Ausnahme eines Threads im Aufrufer-Thread in Python ab

207

Ich bin sehr neu in Python und Multithread-Programmierung im Allgemeinen. Grundsätzlich habe ich ein Skript, das Dateien an einen anderen Speicherort kopiert. Ich möchte, dass dies in einem anderen Thread platziert wird, damit ich ausgeben kann, ....um anzuzeigen, dass das Skript noch ausgeführt wird.

Das Problem, das ich habe, ist, dass wenn die Dateien nicht kopiert werden können, es eine Ausnahme auslöst. Dies ist in Ordnung, wenn es im Hauptthread ausgeführt wird. Der folgende Code funktioniert jedoch nicht:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

In der Thread-Klasse selbst habe ich versucht, die Ausnahme erneut auszulösen, aber es funktioniert nicht. Ich habe Leute hier gesehen, die ähnliche Fragen gestellt haben, aber sie scheinen alle etwas Spezifischeres zu tun als das, was ich versuche (und ich verstehe die angebotenen Lösungen nicht ganz). Ich habe Leute gesehen, die die Verwendung von erwähnt haben sys.exc_info(), aber ich weiß nicht, wo oder wie ich sie verwenden soll.

Jede Hilfe wird sehr geschätzt!

EDIT: Der Code für die Thread-Klasse ist unten:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise
Phanto
quelle
Können Sie mehr Einblick in das geben, was in Ihnen passiert TheThread? Codebeispiel vielleicht?
Nathanismus
Sicher. Ich werde meine Antwort oben bearbeiten, um einige Details aufzunehmen.
Phanto
1
Haben Sie darüber nachgedacht, es umzuschalten, damit der Haupt-Thread das Bit ist, das Dinge erledigt, und die Fortschrittsanzeige im gespawnten Thread ist?
Dan Head
1
Dan Head, beziehen Sie sich auf den Haupt-Thread, der zuerst die Funktion "..." erzeugt und dann die Kopierfunktion ausführt? Das könnte funktionieren und das Ausnahmeproblem vermeiden. Aber ich würde immer noch gerne lernen, wie man Python richtig einfädelt.
Phanto

Antworten:

114

Das Problem ist, dass thread_obj.start()sofort zurückkehrt. Der untergeordnete Thread, den Sie erzeugt haben, wird in einem eigenen Kontext mit einem eigenen Stapel ausgeführt. Jede Ausnahme, die dort auftritt, befindet sich im Kontext des untergeordneten Threads und befindet sich in einem eigenen Stapel. Eine Möglichkeit, die ich mir derzeit vorstellen kann, um diese Informationen an den übergeordneten Thread zu übermitteln, ist die Verwendung einer Art Nachrichtenübermittlung.

Probieren Sie dies für die Größe an:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()
Santa
quelle
5
Warum nicht statt dieser hässlichen while-Schleife dem Thread beitreten? Siehe das multiprocessingÄquivalent: gist.github.com/2311116
schlamar
1
Warum nicht das EventHook-Muster stackoverflow.com/questions/1092531/event-system-in-python/… basierend auf der Antwort von @Lasse verwenden? Eher als die Schleifensache?
Andre Miras
1
Die Warteschlange ist nicht das beste Mittel, um einen Fehler zurückzusenden, es sei denn, Sie möchten eine vollständige Warteschlange haben. Ein viel besseres Konstrukt ist Threading.Event ()
Muposat
1
Das scheint mir unsicher. Was passiert, wenn der Thread direkt nach dem bucket.get()Auslösen eine Ausnahme auslöst Queue.Empty? Dann wird der Thread join(0.1)abgeschlossen und isAlive() is Falseund Sie verpassen Ihre Ausnahme.
Steve
1
Queueist in diesem einfachen Fall nicht erforderlich - Sie können die Ausnahmeinformationen einfach als Eigenschaft von speichern, ExcThreadsolange Sie sicherstellen, dass run()sie direkt nach der Ausnahme abgeschlossen werden (was in diesem einfachen Beispiel der Fall ist). Dann lösen Sie die Ausnahme einfach nach (oder während) erneut aus t.join(). Es gibt keine Synchronisationsprobleme, da join()sichergestellt wird, dass der Thread abgeschlossen ist. Siehe Antwort von Rok Strniša unter stackoverflow.com/a/12223550/126362
ejm
41

Das concurrent.futuresModul macht es einfach, in separaten Threads (oder Prozessen) zu arbeiten und alle daraus resultierenden Ausnahmen zu behandeln:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futuresist in Python 3.2 enthalten und als backportiertes futuresModul für frühere Versionen verfügbar .

Jon-Eric
quelle
5
Dies tut zwar nicht genau das, was das OP verlangt hat, aber es ist genau der Hinweis, den ich brauchte. Danke dir.
Mad Physicist
2
Und mit concurrent.futures.as_completedkönnen Sie sofort benachrichtigt werden, wenn Ausnahmen ausgelöst
Ciro Santilli 14 冠状 病 六四 事件 法轮功
1
Dieser Code blockiert den Hauptthread. Wie macht man das asynchron?
Nikolay Shindarov
39

Es gibt viele wirklich seltsam komplizierte Antworten auf diese Frage. Vereinfache ich das zu stark, weil mir das für die meisten Dinge ausreicht?

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

Wenn Sie sicher sind, dass Sie immer nur auf der einen oder anderen Version von Python ausgeführt werden, können Sie die run()Methode auf die verstümmelte Version reduzieren (wenn Sie nur auf Versionen von Python vor 3 ausgeführt werden) oder Nur die saubere Version (wenn Sie nur mit Python-Versionen ab 3 arbeiten).

Anwendungsbeispiel:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

Und Sie werden die Ausnahme sehen, die auf dem anderen Thread ausgelöst wird, wenn Sie beitreten.

Wenn Sie sixnur Python 3 verwenden oder verwenden, können Sie die Stack-Trace-Informationen verbessern, die Sie erhalten, wenn die Ausnahme erneut ausgelöst wird. Anstelle nur des Stapels am Punkt des Joins können Sie die innere Ausnahme in eine neue äußere Ausnahme einschließen und beide Stapelspuren mit abrufen

six.raise_from(RuntimeError('Exception in thread'),self.exc)

oder

raise RuntimeError('Exception in thread') from self.exc
ArtOfWarfare
quelle
1
Ich bin mir nicht sicher, warum diese Antwort auch nicht beliebter ist. Es gibt hier andere, die ebenfalls eine einfache Weitergabe durchführen, jedoch das Erweitern einer Klasse und das Überschreiben erfordern. Dieser macht genau das, was viele erwarten würden, und erfordert nur den Wechsel von Thread zu ProagatingThread. Und 4 Leerzeichen-Tabulatoren, so dass mein Kopieren / Einfügen trivial war :-) ... die einzige Verbesserung, die ich vorschlagen würde, ist die Verwendung von six.raise_from (), damit Sie einen schönen verschachtelten Satz von Stapelspuren erhalten, anstatt nur den Stapel für die Ort der Reraise.
aggieNick02
Vielen Dank. Sehr einfache Lösung.
Sonulohani
Mein Problem ist, dass ich mehrere untergeordnete Threads habe. Die Verknüpfungen werden nacheinander ausgeführt, und die Ausnahme wird möglicherweise von den später verknüpften Threads ausgelöst. Gibt es eine einfache Lösung für mein Problem? Join gleichzeitig ausführen?
Chuan
Danke, das funktioniert perfekt! Ich bin mir nicht sicher, warum es nicht direkt von Python behandelt wird, obwohl ...
GG.
Dies ist definitiv die nützlichste Antwort, diese Lösung ist viel allgemeiner als andere und dennoch einfach. Wird es in einem Projekt verwenden!
Konstantin Sekeresh
30

Obwohl es nicht möglich ist, eine in einem anderen Thread ausgelöste Ausnahme direkt abzufangen, finden Sie hier einen Code, mit dem Sie ganz transparent etwas erhalten können, das dieser Funktionalität sehr nahe kommt. Ihr untergeordneter Thread muss die ExThreadKlasse anstelle von threading.Threadklassifizieren und der übergeordnete Thread muss die child_thread.join_with_exception()Methode anstelle von aufrufen, child_thread.join()wenn er darauf wartet, dass der Thread seinen Job beendet.

Technische Details dieser Implementierung: Wenn der untergeordnete Thread eine Ausnahme auslöst, wird diese über a Queuean den übergeordneten Thread übergeben und im übergeordneten Thread erneut ausgelöst. Beachten Sie, dass bei diesem Ansatz kein Warten erforderlich ist.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()
Mateusz Kobos
quelle
1
Möchtest du nicht fangen BaseException, nicht Exception? Alles, was Sie tun, ist, die Ausnahme von einer Threadzur anderen zu verbreiten. Im Moment KeyboardInterruptwürde IE, a , stillschweigend ignoriert, wenn es in einem Hintergrund-Thread ausgelöst würde.
ArtOfWarfare
join_with_exceptionhängt auf unbestimmte Zeit, wenn ein zweites Mal an einem toten Thread aufgerufen wird. Fix: github.com/fraserharris/threading-extensions/blob/master/…
Fraser Harris
Ich denke nicht, dass Queuees notwendig ist; Siehe meinen Kommentar zu @ Santas Antwort. Sie können es bis zu etwas wie Rok Strnišas Antwort unter stackoverflow.com/a/12223550/126362
ejm
22

Wenn eine Ausnahme in einem Thread auftritt, ist es am besten, sie währenddessen während des Aufruferthreads erneut auszulösen join. Mit der sys.exc_info()Funktion können Sie Informationen zu der Ausnahme erhalten, die derzeit behandelt wird . Diese Informationen können einfach als Eigenschaft des Thread-Objekts gespeichert werden, bis sie joinaufgerufen werden. An diesem Punkt können sie erneut ausgelöst werden.

Beachten Sie, dass a Queue.Queue(wie in anderen Antworten vorgeschlagen) in diesem einfachen Fall nicht erforderlich ist, in dem der Thread höchstens 1 Ausnahme auslöst und direkt nach dem Auslösen einer Ausnahme abgeschlossen wird . Wir vermeiden Rennbedingungen, indem wir einfach darauf warten, dass der Thread abgeschlossen ist.

Erweitern Sie beispielsweise ExcThread(unten) und überschreiben Sie excRun(anstelle von run).

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

Das 3-Argument-Formular für raiseist in Python 3 nicht mehr vorhanden. Ändern Sie daher die letzte Zeile in:

raise new_exc.with_traceback(self.exc[2])
Rok Strniša
quelle
2
Warum verwenden Sie threading.Thread.join (self) anstelle von super (ExcThread, self) .join ()?
Richard Möhn
9

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

Die folgende Lösung:

  • kehrt sofort zum Hauptthread zurück, wenn eine Ausnahme aufgerufen wird
  • erfordert keine zusätzlichen benutzerdefinierten Klassen, da Folgendes nicht erforderlich ist:
    • eine explizite Queue
    • um eine Ausnahme außer um Ihren Arbeitsthread hinzuzufügen

Quelle:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

Mögliche Ausgabe:

0
0
1
1
2
2
0
Exception()
1
2
None

Es ist leider nicht möglich, Futures zu töten, um die anderen abzubrechen, wenn einer fehlschlägt:

Wenn Sie etwas tun wie:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

dann withfängt der es und wartet, bis der zweite Faden fertig ist, bevor er fortfährt. Folgendes verhält sich ähnlich:

for future in concurrent.futures.as_completed(futures):
    future.result()

da future.result()löst die Ausnahme erneut aus, wenn eine aufgetreten ist.

Wenn Sie den gesamten Python-Prozess beenden möchten, kommen Sie möglicherweise davon os._exit(0), aber dies bedeutet wahrscheinlich, dass Sie einen Refactor benötigen.

Benutzerdefinierte Klasse mit perfekter Ausnahmesemantik

Am Ende habe ich die perfekte Oberfläche für mich selbst programmiert: Der richtige Weg, um die maximale Anzahl von Threads zu begrenzen, die gleichzeitig ausgeführt werden? Abschnitt "Warteschlangenbeispiel mit Fehlerbehandlung". Diese Klasse soll sowohl praktisch sein als auch Ihnen die vollständige Kontrolle über die Übermittlung und die Behandlung von Ergebnissen / Fehlern geben.

Getestet unter Python 3.6.7, Ubuntu 18.04.

Ciro Santilli 中心 改造 6 996ICU 六四 事件
quelle
4

Dies war ein böses kleines Problem, und ich würde gerne meine Lösung einbringen. Einige andere Lösungen, die ich gefunden habe (z. B. async.io), sahen vielversprechend aus, präsentierten aber auch eine Art Black Box. Der Ansatz der Warteschlangen- / Ereignisschleife bindet Sie an eine bestimmte Implementierung. Der Quellcode für gleichzeitige Futures umfasst jedoch nur etwa 1000 Zeilen und ist leicht zu verstehen . Dadurch konnte ich mein Problem leicht lösen: Ad-hoc-Arbeitsthreads ohne viel Setup erstellen und Ausnahmen im Hauptthread abfangen.

Meine Lösung verwendet die Concurrent Futures API und die Threading API. Sie können einen Worker erstellen, der Ihnen sowohl den Thread als auch die Zukunft bietet. Auf diese Weise können Sie dem Thread beitreten, um auf das Ergebnis zu warten:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

... oder Sie können den Mitarbeiter einfach einen Rückruf senden lassen, wenn Sie fertig sind:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

... oder Sie können eine Schleife ausführen, bis das Ereignis abgeschlossen ist:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

Hier ist der Code:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

... und die Testfunktion:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')
Calvin Froedge
quelle
2

Als Neuling im Threading habe ich lange gebraucht, um zu verstehen, wie man den Code von Mateusz Kobos (oben) implementiert. Hier ist eine geklärte Version, um zu verstehen, wie man sie benutzt.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException
BurningKrome
quelle
2

Ähnlich wie bei RickardSjogren ohne Queue, System usw., aber auch ohne einige Listener für Signale: Führen Sie direkt einen Ausnahmebehandler aus, der einem Ausnahmeblock entspricht.

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

Nur self._callback und der Ausnahmeblock in run () sind zusätzlich zum normalen Threading.Thread.

Chickenmarkus
quelle
2

Ich weiß, dass ich hier etwas zu spät zur Party komme, aber ich hatte ein sehr ähnliches Problem, aber es beinhaltete die Verwendung von tkinter als GUI, und der Mainloop machte es unmöglich, eine der Lösungen zu verwenden, die von .join () abhängen. Daher habe ich die im EDIT der ursprünglichen Frage angegebene Lösung angepasst, sie jedoch allgemeiner gestaltet, um das Verständnis für andere zu erleichtern.

Hier ist die neue Thread-Klasse in Aktion:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

Natürlich können Sie die Ausnahme jederzeit auf eine andere Weise als durch Protokollieren behandeln lassen, z. B. durch Ausdrucken oder Ausgeben an die Konsole.

Auf diese Weise können Sie die ExceptionThread-Klasse genau wie die Thread-Klasse ohne besondere Änderungen verwenden.

Firo
quelle
1

Eine Methode, die ich mag, basiert auf dem Beobachtermuster . Ich definiere eine Signalklasse, die mein Thread verwendet, um Ausnahmen an Listener zu senden. Es kann auch verwendet werden, um Werte von Threads zurückzugeben. Beispiel:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

Ich habe nicht genug Erfahrung mit Threads, um zu behaupten, dass dies eine völlig sichere Methode ist. Aber es hat bei mir funktioniert und ich mag die Flexibilität.

RickardSjogren
quelle
Trennen Sie die Verbindung nach join ()?
ealeon
Ich nicht, aber ich denke, es wäre eine gute Idee, damit Sie keine Hinweise auf unbenutzte Dinge haben, die herumhängen.
RickardSjogren
Ich habe festgestellt, dass "handle_exception" immer noch Teil eines untergeordneten Threads ist. müssen Weg, um es an den Thread-Aufrufer zu übergeben
ealeon
1

Nackte Ausnahmen zu verwenden ist keine gute Praxis, da Sie normalerweise mehr fangen, als Sie erwarten.

Ich würde vorschlagen, das exceptzu ändern, um NUR die Ausnahme zu erfassen, die Sie behandeln möchten. Ich denke nicht, dass das Erhöhen den gewünschten Effekt hat, denn wenn Sie TheThreadim Äußeren instanziieren und tryeine Ausnahme auslösen, wird die Zuweisung niemals stattfinden.

Stattdessen möchten Sie möglicherweise nur darauf aufmerksam machen und fortfahren, z. B.:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

Wenn diese Ausnahme abgefangen wird, können Sie sie dort behandeln. Wenn das Äußere tryeine Ausnahme abfängt TheThread, wissen Sie, dass es nicht die ist, die Sie bereits behandelt haben, und helfen Ihnen dabei, Ihren Prozessablauf zu isolieren.

Nathanismus
quelle
1
Nun, wenn es in diesem Thread überhaupt einen Fehler gibt, möchte ich, dass das vollständige Programm den Benutzer darüber informiert, dass ein Problem aufgetreten ist, und ordnungsgemäß beendet wird. Aus diesem Grund möchte ich, dass der Haupt-Thread alle Ausnahmen abfängt und behandelt. Es besteht jedoch weiterhin das Problem, dass, wenn TheThread eine Ausnahme auslöst, der Versuch / die Ausnahme des Hauptthreads diese immer noch nicht abfängt. Ich könnte den Thread die Ausnahme erkennen lassen und eine falsche Rückgabe zurückgeben, die angibt, dass die Operation nicht erfolgreich war. Das würde das gleiche gewünschte Ergebnis erzielen, aber ich würde immer noch gerne wissen, wie man eine Sub-Thread-Ausnahme richtig abfängt.
Phanto
1

Eine einfache Möglichkeit, die Ausnahme des Threads abzufangen und mit der Aufrufermethode zu kommunizieren, besteht darin, ein Wörterbuch oder eine Liste an die workerMethode zu übergeben.

Beispiel (Übergabe des Wörterbuchs an die Worker-Methode):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])
Rado Stoyanov
quelle
1

Wrap Thread mit Ausnahmespeicher.

import threading
import sys
class ExcThread(threading.Thread):

    def __init__(self, target, args = None):
        self.args = args if args else []
        self.target = target
        self.exc = None
        threading.Thread.__init__(self)

    def run(self):
        try:
            self.target(*self.args)
            raise Exception('An error occured here.')
        except Exception:
            self.exc=sys.exc_info()

def main():
    def hello(name):
        print(!"Hello, {name}!")
    thread_obj = ExcThread(target=hello, args=("Jack"))
    thread_obj.start()

    thread_obj.join()
    exc = thread_obj.exc
    if exc:
        exc_type, exc_obj, exc_trace = exc
        print(exc_type, ':',exc_obj, ":", exc_trace)

main()
ahuigo
quelle
0

pygolang stellt sync.WorkGroup bereit, die insbesondere Ausnahmen von erzeugten Arbeitsthreads an den Hauptthread weitergibt . Beispielsweise:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

gibt beim Ausführen Folgendes an:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

Der ursprüngliche Code aus der Frage wäre nur:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()
kirr
quelle