Zeitüberschreitung bei einem Funktionsaufruf

300

Ich rufe eine Funktion in Python auf, von der ich weiß, dass sie möglicherweise blockiert und mich zwingt, das Skript neu zu starten.

Wie rufe ich die Funktion auf oder was verpacke ich sie, damit das Skript sie abbricht und etwas anderes tut, wenn es länger als 5 Sekunden dauert?

Teifion
quelle

Antworten:

227

Sie können das Signalpaket verwenden , wenn Sie unter UNIX arbeiten:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 Sekunden nach dem Aufruf alarm.alarm(10)wird der Handler aufgerufen. Dies löst eine Ausnahme aus, die Sie vom regulären Python-Code abfangen können.

Dieses Modul spielt nicht gut mit Threads (aber wer dann?)

Beachten Sie, dass eine Ausnahme, wenn eine Zeitüberschreitung auftritt, möglicherweise innerhalb der Funktion abgefangen und ignoriert wird, z. B. bei einer solchen Funktion:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue
Piro
quelle
5
Ich benutze Python 2.5.4. Es liegt ein solcher Fehler vor: Traceback (letzter Aufruf zuletzt): Datei "aa.py", Zeile 85, in func signal.signal (signal.SIGALRM, Handler) AttributeError: Das Objekt 'module' hat kein Attribut 'SIGALRM'
flypen
11
@flypen das liegt daran signal.alarmund die dazugehörigen SIGALRMsind auf Windows-Plattformen nicht verfügbar.
Double AA
2
Wenn es viele Prozesse gibt und jeder Aufruf signal.signal--- funktionieren sie alle richtig? Wird nicht jeder signal.signalAnruf "gleichzeitig" abgebrochen?
Brownian
1
Warnung für diejenigen, die dies mit einer C-Erweiterung verwenden möchten: Der Python-Signalhandler wird erst aufgerufen, wenn die C-Funktion die Steuerung an den Python-Interpreter zurückgibt. Verwenden Sie für diesen Anwendungsfall die Antwort von ATOzTOA
wkschwartz
13
Ich stimme der Warnung über Threads zu. signal.alarm funktioniert nur im Haupt-Thread. Ich habe versucht, dies in Django-Ansichten zu verwenden - sofort fehlgeschlagen, wenn nur über den Hauptthread gesprochen wird.
JL Peyret
154

Sie können multiprocessing.Processgenau das verwenden.

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()
ATOzTOA
quelle
36
Wie kann ich den Rückgabewert der Zielmethode erhalten?
bad_keypoints
4
Dies scheint nicht zu funktionieren, wenn die aufgerufene Funktion in einem E / A-Block hängen bleibt.
Sudo
4
@bad_keypoints Siehe diese Antwort: stackoverflow.com/a/10415215/1384471 Grundsätzlich übergeben Sie eine Liste, in die Sie die Antwort einfügen .
Peter
1
@sudo dann entfernen Sie die join(). Dadurch wird Ihre x-Anzahl gleichzeitiger Unterprozesse ausgeführt, bis sie ihre Arbeit beendet haben, oder die in definierte Menge join(10). Wenn Sie eine blockierende E / A für 10 Prozesse haben, haben Sie sie mit join (10) so eingestellt, dass sie alle maximal 10 auf JEDEN Prozess warten, der gestartet wurde. Verwenden Sie das Daemon-Flag wie in diesem Beispiel stackoverflow.com/a/27420072/2480481 . Natürlich können Sie das Flag daemon=Truedirekt an die multiprocessing.Process()Funktion übergeben.
m3nda
2
@ATOzTOA Das Problem mit dieser Lösung ist, zumindest für meine Zwecke, dass es möglicherweise nicht zulässt, dass Kinderprofile nach sich selbst reinigen. Aus der Dokumentation der Terminate-Funktionterminate() ... Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned.
Abalcerek
78

Wie rufe ich die Funktion auf oder was verpacke ich sie, damit das Skript sie abbricht, wenn sie länger als 5 Sekunden dauert?

Ich habe einen Kern gepostet , der diese Frage / dieses Problem mit einem Dekorateur und einem löst threading.Timer. Hier ist es mit einer Panne.

Importe und Setups aus Kompatibilitätsgründen

Es wurde mit Python 2 und 3 getestet. Es sollte auch unter Unix / Linux und Windows funktionieren.

Zuerst die Importe. Diese versuchen, den Code unabhängig von der Python-Version konsistent zu halten:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Verwenden Sie den versionunabhängigen Code:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Jetzt haben wir unsere Funktionalität aus der Standardbibliothek importiert.

exit_after Dekorateur

Als nächstes benötigen wir eine Funktion, um den main()vom untergeordneten Thread zu beenden :

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

Und hier ist der Dekorateur selbst:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Verwendung

Und hier ist die Verwendung, die Ihre Frage zum Beenden nach 5 Sekunden direkt beantwortet!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Demo:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

Der zweite Funktionsaufruf wird nicht beendet, stattdessen sollte der Prozess mit einem Traceback beendet werden!

KeyboardInterrupt stoppt nicht immer einen schlafenden Faden

Beachten Sie, dass der Ruhezustand unter Python 2 unter Windows nicht immer durch einen Tastaturinterrupt unterbrochen wird, z.

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

Es ist auch nicht wahrscheinlich, dass Code, der in Erweiterungen ausgeführt wird, unterbrochen wird, es sei denn, er prüft explizit PyErr_CheckSignals(), ob Cython, Python und KeyboardInterrupt ignoriert werden

Ich würde es auf jeden Fall vermeiden, einen Thread länger als eine Sekunde zu schlafen - das ist ein Äon in der Prozessorzeit.

Wie rufe ich die Funktion auf oder was verpacke ich sie, damit das Skript sie abbricht und etwas anderes tut , wenn es länger als 5 Sekunden dauert?

Um es zu fangen und etwas anderes zu tun, können Sie den KeyboardInterrupt fangen.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
Aaron Hall
quelle
Ich habe Ihren ganzen Beitrag noch nicht gelesen, aber ich habe mich nur gefragt: Was ist, wenn Flush 0 ist? Das würde in der darunter liegenden if-Anweisung als falsch interpretiert werden, oder?
Koenraad van Duin
2
Warum muss ich anrufen thread.interrupt_main(), warum kann ich keine Ausnahme direkt auslösen?
Anirban Nag 'tintinmj'
Irgendwelche Gedanken multiprocessing.connection.Clientdazu? - Versuch zu lösen: stackoverflow.com/questions/57817955/…
wwii
51

Ich habe einen anderen Vorschlag, der eine reine Funktion ist (mit der gleichen API wie der Threading-Vorschlag) und anscheinend gut funktioniert (basierend auf Vorschlägen zu diesem Thread).

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
Alex
quelle
3
Sie sollten auch den ursprünglichen Signalhandler wiederherstellen. Siehe stackoverflow.com/questions/492519/…
Martin Konecny
9
Noch ein Hinweis: Die Unix-Signalmethode funktioniert nur, wenn Sie sie im Hauptthread anwenden. Das Anwenden in einem Sub-Thread löst eine Ausnahme aus und funktioniert nicht.
Martin Konecny
12
Dies ist nicht die beste Lösung, da sie nur unter Linux funktioniert.
Max
17
Max, nicht wahr - funktioniert unter jedem POSIX-kompatiblen Unix. Ich denke, Ihr Kommentar sollte genauer sein, funktioniert nicht unter Windows.
Chris Johnson
6
Sie sollten vermeiden, kwargs auf ein leeres Diktat zu setzen. Ein häufiges Python-Problem ist, dass Standardargumente für Funktionen veränderbar sind. Damit wird dieses Wörterbuch für alle Anrufe an freigegeben timeout. Es ist viel besser, die Standardeinstellung auf zu setzen Noneund in der ersten Zeile der Funktion hinzuzufügen kwargs = kwargs or {}. Args ist in Ordnung, da Tupel nicht veränderlich sind.
Scottmrogowski
32

Ich bin auf diesen Thread gestoßen, als ich bei Unit-Tests nach einem Timeout-Aufruf gesucht habe. Ich habe in den Antworten oder Paketen von Drittanbietern nichts Einfaches gefunden, also habe ich den Dekorateur unten geschrieben, den Sie direkt in den Code einfügen können:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Dann ist es so einfach, einen Test oder eine beliebige Funktion zu deaktivieren:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...
Reich
quelle
14
Seien Sie vorsichtig, da dies die Funktion nach Erreichen des Timeouts nicht beendet!
Sylvain
Beachten Sie, dass dies unter Windows einen völlig neuen Prozess hervorruft, der die Zeit bis zum Timeout in Anspruch nimmt, möglicherweise um ein Vielfaches, wenn die Einrichtung der Abhängigkeiten lange dauert.
Aaron Hall
1
Ja, dies muss angepasst werden. Es lässt Fäden für immer laufen.
Sudo
2
IDK, wenn dies der beste Weg ist, aber Sie können versuchen, Exceptioninnerhalb von func_wrapper zu pool.close()fangen, und nach dem Fang tun, um sicherzustellen, dass der Thread immer danach stirbt, egal was passiert . Dann kannst du werfen TimeoutErroroder was immer du willst. Scheint für mich zu arbeiten.
Sudo
2
Das ist nützlich, aber wenn ich es oft gemacht habe, bekomme ich RuntimeError: can't start new thread. Funktioniert es immer noch, wenn ich es ignoriere, oder kann ich noch etwas tun, um dies zu umgehen? Danke im Voraus!
Benjie
20

Das stopitauf pypi gefundene Paket scheint Timeouts gut zu verarbeiten.

Ich mag den @stopit.threading_timeoutableDekorator, der timeoutder dekorierten Funktion einen Parameter hinzufügt , der das tut, was Sie erwarten, er stoppt die Funktion.

Überprüfen Sie es auf pypi: https://pypi.python.org/pypi/stopit

egeland
quelle
1
Es ist sehr handlich und threadsicher! Danke und Plus eins! Dies ist die beste Option, die ich bisher gefunden habe und sogar besser als die akzeptierte Antwort !!
Yahya
Die Bibliothek behauptet, einige Funktionen funktionieren unter Windows nicht.
Stefan Simik
16

Es gibt viele Vorschläge, aber keine, die concurrent.futures verwenden. Ich denke, dies ist der am besten lesbare Weg, um damit umzugehen.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Super einfach zu lesen und zu warten.

Wir erstellen einen Pool, senden einen einzelnen Prozess und warten dann bis zu 5 Sekunden, bevor wir einen TimeoutError auslösen, den Sie nach Bedarf abfangen und verarbeiten können.

Nativ in Python 3.2+ und zurückportiert auf 2.7 (Pip Install Futures).

Das Wechseln zwischen Threads und Prozessen ist so einfach wie das Ersetzen ProcessPoolExecutordurch ThreadPoolExecutor.

Wenn Sie den Prozess mit Zeitüberschreitung beenden möchten, würde ich empfehlen, sich mit Pebble zu befassen .

Brian
quelle
2
Was bedeutet "Warnung: Dies beendet die Funktion nicht, wenn das Zeitlimit überschritten wird"?
Scott Stafford
5
@ScottStafford Prozesse / Threads enden nicht nur, weil ein TimeoutError ausgelöst wurde. Der Prozess oder der Thread versucht also weiterhin, vollständig ausgeführt zu werden, und gibt Ihnen bei Ihrem Timeout nicht automatisch die Kontrolle zurück.
Brian
Würde ich dadurch Ergebnisse speichern können, die zu diesem Zeitpunkt mittelschwer sind? Wenn ich beispielsweise eine rekursive Funktion habe, für die ich das Zeitlimit auf 5 gesetzt habe, und in dieser Zeit Teilergebnisse habe, wie schreibe ich die Funktion, um die Teilergebnisse bei Zeitüberschreitung zurückzugeben?
SumNeuron
Ich benutze genau das, aber ich habe 1000 Aufgaben, jede ist 5 Sekunden vor dem Timeout erlaubt. Mein Problem ist, dass Kerne bei Aufgaben verstopft werden, die niemals enden, da das Zeitlimit nur auf die Gesamtzahl der Aufgaben angewendet wird, nicht auf einzelne Aufgaben. concurrent.futures bietet keine Lösung für dieses Problem.
Bastiaan
12

Großartiger, benutzerfreundlicher und zuverlässiger Timeout-Dekorator für PyPi- Projekte ( https://pypi.org/project/timeout-decorator/ )

Installation :

pip install timeout-decorator

Verwendung :

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
Gil
quelle
2
Ich schätze die klare Lösung. Aber könnte jemand erklären, wie diese Bibliothek funktioniert, besonders wenn es um Multithreading geht? Persönlich fürchte ich, einen unbekannten Machanismus zu verwenden, um mit Threads oder Signalen umzugehen.
wsysuper
@wsysuper die lib hat 2 Betriebsarten: Öffnen Sie einen neuen Thread oder einen neuen Unterprozess (der vermutlich threadsicher ist)
Gil
das hat bei mir sehr gut funktioniert!
Florian Heigl vor
6

Ich bin der Autor von wrapt_timeout_decorator

Die meisten der hier vorgestellten Lösungen funktionieren auf den ersten Blick wunderbar unter Linux - weil wir Fork () und Signale () haben -, aber unter Windows sehen die Dinge etwas anders aus. Und wenn es um Subthreads unter Linux geht, können Sie keine Signale mehr verwenden.

Um einen Prozess unter Windows zu erzeugen, muss er auswählbar sein - und viele dekorierte Funktionen oder Klassenmethoden nicht.

Sie müssen also einen besseren Pickler wie Dill und Multiprocess verwenden (nicht Pickle und Multiprocessing) - deshalb können Sie ProcessPoolExecutor nicht verwenden (oder nur mit eingeschränkter Funktionalität).

Für das Timeout selbst - Sie müssen definieren, was Timeout bedeutet -, da unter Windows eine beträchtliche (und nicht bestimmbare) Zeit benötigt wird, um den Prozess zu erzeugen. Dies kann bei kurzen Zeitüberschreitungen schwierig sein. Nehmen wir an, das Laichen des Prozesses dauert ungefähr 0,5 Sekunden (leicht !!!). Wenn Sie eine Zeitüberschreitung von 0,2 Sekunden angeben, was soll passieren? Sollte die Funktion nach 0,5 + 0,2 Sekunden ablaufen (lassen Sie die Methode also 0,2 Sekunden lang laufen)? Oder sollte der aufgerufene Prozess nach 0,2 Sekunden eine Zeitüberschreitung aufweisen (in diesem Fall wird die dekorierte Funktion IMMER eine Zeitüberschreitung aufweisen, da sie in dieser Zeit nicht einmal erzeugt wird)?

Auch verschachtelte Dekorateure können böse sein und Sie können keine Signale in einem Unterfaden verwenden. Wenn Sie einen wirklich universellen, plattformübergreifenden Dekorateur erstellen möchten, muss dies alles berücksichtigt (und getestet) werden.

Andere Probleme sind die Rückgabe von Ausnahmen an den Aufrufer sowie Protokollierungsprobleme (falls in der dekorierten Funktion verwendet - die Protokollierung in Dateien in einem anderen Prozess wird NICHT unterstützt)

Ich habe versucht, alle Randfälle abzudecken. Sie könnten in das Paket wrapt_timeout_decorator schauen oder zumindest Ihre eigenen Lösungen testen, die von den dort verwendeten Unittests inspiriert sind.

@Alexis Eggermont - leider habe ich nicht genug Punkte zum Kommentieren - vielleicht kann dich jemand anderes benachrichtigen - ich glaube, ich habe dein Importproblem gelöst.

Bitranox
quelle
3

timeout-decoratorfunktioniert nicht auf Windows-System, da Windows nicht signalgut unterstützt.

Wenn Sie Timeout-Decorator im Windows-System verwenden, erhalten Sie Folgendes

AttributeError: module 'signal' has no attribute 'SIGALRM'

Einige schlugen vor, zu verwenden use_signals=False, arbeiteten aber nicht für mich.

Der Autor @bitranox hat das folgende Paket erstellt:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Codebeispiel:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Gibt die folgende Ausnahme:

TimeoutError: Function mytest timed out after 5 seconds
als ob
quelle
Das klingt nach einer sehr schönen Lösung. Seltsamerweise from wrapt_timeout_decorator import * scheint die Linie einige meiner anderen Importe zu töten. Zum Beispiel bekomme ich ModuleNotFoundError: No module named 'google.appengine', aber ich bekomme diesen Fehler nicht, wenn ich wrapt_timeout_decorator nicht importiere
Alexis Eggermont
@AlexisEggermont Ich wollte dies mit Appengine verwenden ... also bin ich sehr neugierig, ob dieser Fehler weiterhin besteht?
PascalVKooten
2

Wir können Signale dafür verwenden. Ich denke, das folgende Beispiel wird für Sie nützlich sein. Es ist sehr einfach im Vergleich zu Threads.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"
AR
quelle
1
Es wäre besser, eine bestimmte Ausnahme zu wählen und nur sie zu fangen. Nackt try: ... except: ...sind immer eine schlechte Idee.
Hivert
Ich stimme dir zu, hivert.
AR
Obwohl ich den Grund verstehe, bin ich als Systemadministrator / Integrator anderer Meinung - Python-Code ist dafür berüchtigt, die Fehlerbehandlung zu vernachlässigen, und die Behandlung der einen Sache, die Sie erwarten, ist für hochwertige Software nicht gut genug. Sie können die 5 Dinge, für die Sie planen, und eine generische Strategie für andere Dinge handhaben. "Traceback, None" ist keine Strategie, es ist eine Beleidigung.
Florian Heigl vor
2
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
Hal Canary
quelle
7
Während dieser Code die Frage beantworten kann, würde die Bereitstellung eines zusätzlichen Kontexts darüber, wie und / oder warum das Problem gelöst wird, den langfristigen Wert der Antwort verbessern
Dan Cornilescu
1

Ich brauchte verschachtelbare zeitgesteuerte Interrupts (was SIGALARM nicht kann), die nicht durch time.sleep blockiert werden (was der threadbasierte Ansatz nicht kann). Am Ende habe ich Code von hier kopiert und leicht modifiziert: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

Der Code selbst:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

und ein Anwendungsbeispiel:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'
James
quelle
Dies verwendet auch ein Signal und funktioniert daher nicht, wenn es von einem Thread aufgerufen wird.
Garg10Mai
0

Hier ist eine leichte Verbesserung der gegebenen threadbasierten Lösung.

Der folgende Code unterstützt Ausnahmen :

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Aufrufen mit einer Zeitüberschreitung von 5 Sekunden:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
Diemacht
quelle
1
Dies löst eine neue Ausnahme aus, die den ursprünglichen Traceback verbirgt. Siehe meine Version unten ...
Meitham
1
Dies ist auch unsicher, als ob innerhalb runFunctionCatchExceptions()bestimmter Python-Funktionen das Erhalten von GIL aufgerufen wird. Zum Beispiel würde das Folgende niemals oder für sehr lange Zeit zurückkehren, wenn es innerhalb der Funktion aufgerufen wird : eval(2**9999999999**9999999999). Siehe stackoverflow.com/questions/22138190/…
Mikko Ohtamaa