In meinem Code befindet sich ein Socket-bezogener Funktionsaufruf. Diese Funktion stammt von einem anderen Modul und ist daher außerhalb meiner Kontrolle. Das Problem besteht darin, dass sie gelegentlich stundenlang blockiert, was völlig inakzeptabel ist. Wie kann ich die Ausführungszeit der Funktion von meinem Code aus begrenzen? Ich denke, die Lösung muss einen anderen Thread verwenden.
python
multithreading
Übrigens
quelle
quelle
Antworten:
Ich bin mir nicht sicher, wie plattformübergreifend dies sein könnte, aber die Verwendung von Signalen und Alarmen könnte eine gute Sichtweise darauf sein. Mit ein wenig Arbeit können Sie dies auch vollständig generisch und in jeder Situation verwendbar machen.
http://docs.python.org/library/signal.html
Ihr Code wird also ungefähr so aussehen.
import signal def signal_handler(signum, frame): raise Exception("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(10) # Ten seconds try: long_function_call() except Exception, msg: print "Timed out!"
quelle
Eine Verbesserung der Antwort von @ rik.the.vik wäre, die
with
Anweisung zu verwenden, um der Timeout-Funktion etwas syntaktischen Zucker zu geben:import signal from contextlib import contextmanager class TimeoutException(Exception): pass @contextmanager def time_limit(seconds): def signal_handler(signum, frame): raise TimeoutException("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) try: with time_limit(10): long_function_call() except TimeoutException as e: print("Timed out!")
quelle
try: yield \n finally: signal.alarm(0)
Hier ist eine Linux / OSX-Methode, um die Laufzeit einer Funktion zu begrenzen. Dies ist der Fall, wenn Sie keine Threads verwenden möchten und Ihr Programm warten soll, bis die Funktion endet oder das Zeitlimit abläuft.
from multiprocessing import Process from time import sleep def f(time): sleep(time) def run_with_limited_time(func, args, kwargs, time): """Runs a function with time limit :param func: The function to run :param args: The functions args, given as tuple :param kwargs: The functions keywords, given as dict :param time: The time limit in seconds :return: True if the function ended successfully. False if it was terminated. """ p = Process(target=func, args=args, kwargs=kwargs) p.start() p.join(time) if p.is_alive(): p.terminate() return False return True if __name__ == '__main__': print run_with_limited_time(f, (1.5, ), {}, 2.5) # True print run_with_limited_time(f, (3.5, ), {}, 2.5) # False
quelle
Ich bevorzuge einen Kontextmanager-Ansatz, da er die Ausführung mehrerer Python-Anweisungen innerhalb einer
with time_limit
Anweisung ermöglicht. Da Windows-System nicht hatSIGALARM
, könnte eine portablere und möglicherweise einfachere Methode die Verwendung von a seinTimer
from contextlib import contextmanager import threading import _thread class TimeoutException(Exception): def __init__(self, msg=''): self.msg = msg @contextmanager def time_limit(seconds, msg=''): timer = threading.Timer(seconds, lambda: _thread.interrupt_main()) timer.start() try: yield except KeyboardInterrupt: raise TimeoutException("Timed out for operation {}".format(msg)) finally: # if the action ends in specified time, timer is canceled timer.cancel() import time # ends after 5 seconds with time_limit(5, 'sleep'): for i in range(10): time.sleep(1) # this will actually end after 10 seconds with time_limit(5, 'sleep'): time.sleep(10)
Die Schlüsseltechnik hier ist die Verwendung,
_thread.interrupt_main
um den Haupt-Thread vom Timer-Thread zu unterbrechen. Eine Einschränkung ist, dass der Hauptfaden nicht immer auf dasKeyboardInterrupt
durch dasTimer
schnelle Anheben reagiert . Ruft beispielsweisetime.sleep()
eine Systemfunktion auf, sodass aKeyboardInterrupt
nach demsleep
Aufruf behandelt wird.quelle
thread
Modul_thread
in Python 3 umbenannt wurde, müssten Sie ausimport sys if sys.version_info[0] < 3: from thread import interrupt_main else: from _thread import interrupt_main
timer = threading.Timer(seconds, lambda: interrupt_main())
Dies innerhalb eines Signalhandlers zu tun, ist gefährlich: Sie befinden sich möglicherweise zum Zeitpunkt des Auslösens der Ausnahme in einem Ausnahmebehandler und lassen die Dinge in einem fehlerhaften Zustand. Zum Beispiel,
def function_with_enforced_timeout(): f = open_temporary_file() try: ... finally: here() unlink(f.filename)
Wenn Ihre Ausnahme hier () ausgelöst wird, wird die temporäre Datei niemals gelöscht.
Die Lösung besteht darin, dass asynchrone Ausnahmen verschoben werden, bis sich der Code nicht mehr im Code für die Ausnahmebehandlung befindet (ein Ausnahme- oder Endblock), aber Python tut dies nicht.
Beachten Sie, dass dies beim Ausführen von nativem Code nichts unterbricht. Es wird nur unterbrochen, wenn die Funktion zurückkehrt, sodass dies in diesem speziellen Fall möglicherweise nicht hilft. (SIGALRM selbst kann den blockierenden Anruf unterbrechen. Der Socket-Code wird jedoch normalerweise nach einem EINTR einfach erneut versucht.)
Dies mit Threads zu tun, ist eine bessere Idee, da es portabler ist als Signale. Da Sie einen Arbeitsthread starten und blockieren, bis er abgeschlossen ist, gibt es keine der üblichen Parallelitätsprobleme. Leider gibt es keine Möglichkeit, eine Ausnahme asynchron an einen anderen Thread in Python zu senden (andere Thread-APIs können dies tun). Es gibt auch das gleiche Problem beim Senden einer Ausnahme während eines Ausnahmebehandlungsprogramms und erfordert das gleiche Update.
quelle
PyErr_CheckSignals()
vor dem Neustart auf EINTR aufgerufen, damit ein Python-Signalhandler ausgeführt werden kann.Sie müssen keine Threads verwenden. Sie können einen anderen Prozess verwenden, um die Blockierungsarbeit auszuführen, z. B. mithilfe des Unterprozessmoduls . Wenn Sie Datenstrukturen zwischen verschiedenen Teilen Ihres Programms austauschen möchten, ist Twisted eine großartige Bibliothek, um sich selbst die Kontrolle darüber zu geben. Ich würde es empfehlen, wenn Sie sich für das Blockieren interessieren und erwarten, dass diese Probleme häufig auftreten. Die schlechte Nachricht bei Twisted ist, dass Sie Ihren Code neu schreiben müssen, um Blockierungen zu vermeiden, und es gibt eine faire Lernkurve.
Sie können Threads verwenden, um ein Blockieren zu vermeiden, aber ich würde dies als letzten Ausweg betrachten, da Sie dadurch einer ganzen Welt voller Schmerzen ausgesetzt sind. Lesen Sie ein gutes Buch über Parallelität, bevor Sie überhaupt über die Verwendung von Threads in der Produktion nachdenken, z. B. Jean Bacons "Concurrent Systems". Ich arbeite mit einer Menge Leute zusammen, die wirklich coole Hochleistungs-Sachen mit Threads machen, und wir führen Threads nicht in Projekte ein, es sei denn, wir brauchen sie wirklich.
quelle
Die einzige "sichere" Möglichkeit, dies in einer beliebigen Sprache zu tun, besteht darin, einen sekundären Prozess zu verwenden, um diese Zeitüberschreitung auszuführen. Andernfalls müssen Sie Ihren Code so erstellen, dass er von selbst sicher eine Zeitüberschreitung aufweist, z. B. durch Überprüfen der in einer Schleife oder ähnlichem verstrichenen Zeit. Wenn das Ändern der Methode keine Option ist, reicht ein Thread nicht aus.
Warum? Weil Sie riskieren, die Dinge in einem schlechten Zustand zu lassen, wenn Sie dies tun. Wenn der Thread während der Methode einfach beendet wird, werden gehaltene Sperren usw. nur gehalten und können nicht freigegeben werden.
Schauen Sie sich also den Prozess an, nicht den Thread.
quelle
Normalerweise würde ich lieber einen Kontextmanager verwenden, wie von @ josh-lee vorgeschlagen
Falls jedoch jemand daran interessiert ist, dies als Dekorateur zu implementieren, ist hier eine Alternative.
So würde es aussehen:
import time from timeout import timeout class Test(object): @timeout(2) def test_a(self, foo, bar): print foo time.sleep(1) print bar return 'A Done' @timeout(2) def test_b(self, foo, bar): print foo time.sleep(3) print bar return 'B Done' t = Test() print t.test_a('python', 'rocks') print t.test_b('timing', 'out')
Und das ist das
timeout.py
Modul:import threading class TimeoutError(Exception): pass class InterruptableThread(threading.Thread): def __init__(self, func, *args, **kwargs): threading.Thread.__init__(self) self._func = func self._args = args self._kwargs = kwargs self._result = None def run(self): self._result = self._func(*self._args, **self._kwargs) @property def result(self): return self._result class timeout(object): def __init__(self, sec): self._sec = sec def __call__(self, f): def wrapped_f(*args, **kwargs): it = InterruptableThread(f, *args, **kwargs) it.start() it.join(self._sec) if not it.is_alive(): return it.result raise TimeoutError('execution expired') return wrapped_f
Die Ausgabe:
Beachten Sie, dass
TimeoutError
die dekorierte Methode auch dann in einem anderen Thread ausgeführt wird , wenn sie ausgelöst wird. Wenn Sie auch möchten, dass dieser Thread "gestoppt" wird, lesen Sie: Gibt es eine Möglichkeit, einen Thread in Python zu beenden?quelle
Hier ist eine Timeout-Funktion, die ich über Google gefunden habe und die für mich funktioniert.
Von: http://code.activestate.com/recipes/473878/
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None): '''This function will spwan a thread and run the given function using the args, kwargs and return the given default value if the timeout_duration is exceeded ''' import threading class InterruptableThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.result = default def run(self): try: self.result = func(*args, **kwargs) except: self.result = default it = InterruptableThread() it.start() it.join(timeout_duration) if it.isAlive(): return it.result else: return it.result
quelle
Die Methode von @ user2283347 wurde getestet, aber wir möchten die Traceback-Nachrichten entfernen. Verwenden Sie den Pass-Trick von Remove traceback in Python unter Strg-C . Der geänderte Code lautet:
from contextlib import contextmanager import threading import _thread class TimeoutException(Exception): pass @contextmanager def time_limit(seconds): timer = threading.Timer(seconds, lambda: _thread.interrupt_main()) timer.start() try: yield except KeyboardInterrupt: pass finally: # if the action ends in specified time, timer is canceled timer.cancel() def timeout_svm_score(i): #from sklearn import svm #import numpy as np #from IPython.core.display import display #%store -r names X Y clf = svm.SVC(kernel='linear', C=1).fit(np.nan_to_num(X[[names[i]]]), Y) score = clf.score(np.nan_to_num(X[[names[i]]]),Y) #scoressvm.append((score, names[i])) display((score, names[i])) %%time with time_limit(5): i=0 timeout_svm_score(i) #Wall time: 14.2 s %%time with time_limit(20): i=0 timeout_svm_score(i) #(0.04541284403669725, '计划飞行时间') #Wall time: 16.1 s %%time with time_limit(5): i=14 timeout_svm_score(i) #Wall time: 5h 43min 41s
Wir können sehen, dass diese Methode sehr viel Zeit benötigt, um die Berechnung zu unterbrechen. Wir haben um 5 Sekunden gebeten, aber sie funktioniert in 5 Stunden.
quelle