Tastatur Interrupts mit dem Multiprozessor-Pool von Python

136

Wie kann ich mit KeyboardInterrupt-Ereignissen mit den Multiprocessing-Pools von Python umgehen? Hier ist ein einfaches Beispiel:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Wenn KeyboardInterruptich den obigen Code ausführe, wird der beim Drücken ausgelöst ^C, aber der Prozess hängt einfach an diesem Punkt und ich muss ihn extern beenden.

Ich möchte jederzeit drücken ^Ckönnen und alle Prozesse ordnungsgemäß beenden.

Fragsworth
quelle
Ich habe mein Problem mit psutil gelöst. Die Lösung finden Sie hier: stackoverflow.com/questions/32160054/…
Tiago Albineli Motta

Antworten:

137

Dies ist ein Python-Fehler. Beim Warten auf eine Bedingung in threading.Condition.wait () wird KeyboardInterrupt niemals gesendet. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

Die KeyboardInterrupt-Ausnahme wird erst ausgeliefert, wenn wait () zurückgegeben wird, und sie wird nie zurückgegeben, sodass der Interrupt niemals auftritt. KeyboardInterrupt sollte mit ziemlicher Sicherheit eine Wartezeit unterbrechen.

Beachten Sie, dass dies nicht der Fall ist, wenn eine Zeitüberschreitung angegeben wird. cond.wait (1) erhält den Interrupt sofort. Eine Problemumgehung besteht darin, ein Zeitlimit anzugeben. Ersetzen Sie dazu

    results = pool.map(slowly_square, range(40))

mit

    results = pool.map_async(slowly_square, range(40)).get(9999999)

o.ä.

Glenn Maynard
quelle
3
Ist dieser Fehler irgendwo im offiziellen Python-Tracker? Ich habe Probleme, es zu finden, aber ich verwende wahrscheinlich nicht die besten Suchbegriffe.
Joseph Garvin
18
Dieser Fehler wurde als [Ausgabe 8296] [1] abgelegt. [1]: bugs.python.org/issue8296
Andrey Vlasovskikh
1
Hier ist ein Hack, der pool.imap () auf die gleiche Weise behebt und Strg-C ermöglicht, wenn über imap iteriert wird. Fangen Sie die Ausnahme ab und rufen Sie pool.terminate () auf. Ihr Programm wird beendet. gist.github.com/626518
Alexander Ljungberg
6
Das behebt die Dinge nicht ganz. Manchmal erhalte ich das erwartete Verhalten, wenn ich Strg + C drücke, manchmal nicht. Ich bin mir nicht sicher warum, aber es sieht so aus, als würde The KeyboardInterrupt von einem der Prozesse zufällig empfangen, und ich erhalte nur dann das richtige Verhalten, wenn der übergeordnete Prozess derjenige ist, der es abfängt.
Ryan C. Thompson
6
Dies funktioniert bei Python 3.6.1 unter Windows nicht. Ich bekomme Tonnen von Stapelspuren und anderem Müll, wenn ich Strg-C mache, dh wie ohne eine solche Problemumgehung. Tatsächlich scheint keine der Lösungen, die ich in diesem Thread ausprobiert habe, zu funktionieren ...
szx
56

Nach dem, was ich kürzlich gefunden habe, besteht die beste Lösung darin, die Arbeitsprozesse so einzurichten, dass SIGINT vollständig ignoriert wird, und den gesamten Bereinigungscode auf den übergeordneten Prozess zu beschränken. Dies behebt das Problem sowohl für inaktive als auch für ausgelastete Arbeitsprozesse und erfordert keinen Fehlerbehandlungscode in Ihren untergeordneten Prozessen.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Erläuterungen und den vollständigen Beispielcode finden Sie unter http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ bzw. http://github.com/jreese/multiprocessing-keyboardinterrupt .

John Reese
quelle
4
Hallo John. Ihre Lösung erreicht nicht das Gleiche wie meine, ja leider komplizierte Lösung. Es versteckt sich hinter dem time.sleep(10)im Hauptprozess. Wenn Sie diesen Ruhezustand entfernen oder warten, bis der Prozess versucht, sich dem Pool anzuschließen, was Sie tun müssen, um sicherzustellen, dass die Jobs abgeschlossen sind, leiden Sie immer noch unter demselben Problem, das der Hauptprozess nicht hat Empfangen Sie den KeyboardInterrupt nicht, während er auf den Abfragevorgang wartet join.
Bboe
In dem Fall, in dem ich diesen Code in der Produktion verwendet habe, war time.sleep () Teil einer Schleife, die den Status jedes untergeordneten Prozesses überprüft und bestimmte Prozesse bei Bedarf mit einer Verzögerung neu startet. Anstatt mit join () zu warten, bis alle Prozesse abgeschlossen sind, werden sie einzeln überprüft, um sicherzustellen, dass der Master-Prozess reagiert.
John Reese
2
Es war also eher ein arbeitsreiches Warten (möglicherweise mit kleinen Schlafzeiten zwischen den Überprüfungen), das den Prozessabschluss über eine andere Methode abfragte, als sich anzuschließen? In diesem Fall ist es möglicherweise besser, diesen Code in Ihren Blog-Beitrag aufzunehmen, da Sie dann garantieren können, dass alle Mitarbeiter abgeschlossen sind, bevor Sie versuchen, Mitglied zu werden.
Bboe
4
Das funktioniert nicht. Nur den Kindern wird das Signal gesendet. Der Elternteil erhält es pool.terminate()nie , wird also nie ausgeführt. Wenn die Kinder das Signal ignorieren, wird nichts erreicht. @ Glenns Antwort löst das Problem.
Cerin
1
Meine Version davon ist unter gist.github.com/admackin/003dd646e5fadee8b8d6 ; Es wird .join()nur bei Unterbrechung aufgerufen. Es überprüft einfach manuell das Ergebnis der .apply_async()Verwendung, AsyncResult.ready()um festzustellen, ob es bereit ist, was bedeutet, dass wir sauber fertig sind.
Andy MacKinlay
29

Aus bestimmten Gründen werden nur von der Basisklasse geerbte Ausnahmen Exceptionnormal behandelt. Als Abhilfe können , können Sie Ihre re-raisen KeyboardInterruptals ExceptionBeispiel:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normalerweise erhalten Sie die folgende Ausgabe:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Wenn Sie also treffen ^C, erhalten Sie:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Andrey Vlasovskikh
quelle
2
Es scheint, dass dies keine vollständige Lösung ist. Wenn a KeyboardInterruptankommt, während multiprocessinges seinen eigenen IPC-Datenaustausch durchführt, try..catchwird das (offensichtlich) nicht aktiviert.
Andrey Vlasovskikh
Sie könnten durch raise KeyboardInterruptErrorein ersetzen return. Sie müssen nur sicherstellen, dass der untergeordnete Prozess beendet wird, sobald KeyboardInterrupt empfangen wird. Der Rückgabewert scheint ignoriert zu werden, maindennoch wird der KeyboardInterrupt empfangen.
Bernhard
8

Normalerweise funktioniert diese einfache Struktur für Ctrl- Con Pool:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Wie in einigen ähnlichen Beiträgen angegeben:

Erfassen Sie Tastaturinterrupts in Python ohne Try-Except

igco
quelle
1
Dies müsste auch für jeden Worker-Prozess durchgeführt werden und kann immer noch fehlschlagen, wenn der KeyboardInterrupt ausgelöst wird, während die Multiprozessor-Bibliothek initialisiert wird.
MarioVilas
7

Die abgestimmte Antwort befasst sich nicht mit dem Kernproblem, sondern mit einem ähnlichen Nebeneffekt.

Jesse Noller, der Autor der Multiprocessing-Bibliothek, erklärt, wie man multiprocessing.Poolin einem alten Blog-Beitrag richtig mit STRG + C umgeht .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
noxdafox
quelle
Ich habe festgestellt, dass ProcessPoolExecutor das gleiche Problem hat. Die einzige Lösung, die ich finden konnte, war, os.setpgrp()aus der Zukunft heraus
anzurufen
1
Sicher, der einzige Unterschied ist, dass ProcessPoolExecutorInitialisierungsfunktionen nicht unterstützt werden. Unter Unix können Sie die forkStrategie nutzen, indem Sie den Sighandler für den Hauptprozess deaktivieren, bevor Sie den Pool erstellen und anschließend wieder aktivieren. In Pebble schweige ich SIGINTstandardmäßig zu den untergeordneten Prozessen. Mir ist nicht bekannt, warum sie mit den Python-Pools nicht dasselbe tun. Am Ende könnte der Benutzer den SIGINTHandler zurücksetzen, falls er sich selbst verletzen möchte.
Noxdafox
Diese Lösung scheint zu verhindern, dass Strg-C auch den Hauptprozess unterbricht.
Paul Price
1
Ich habe gerade Python 3.5 getestet und es funktioniert. Welche Version von Python verwenden Sie? Welches Betriebssystem?
Noxdafox
5

Es scheint, dass es zwei Probleme gibt, die Ausnahmen bei der Mehrfachverarbeitung nerven. Die erste (von Glenn notierte) ist, dass Sie map_asynceine Zeitüberschreitung verwenden mapmüssen, um eine sofortige Antwort zu erhalten (dh die Verarbeitung der gesamten Liste nicht abzuschließen). Die zweite (von Andrey notierte) ist, dass Multiprocessing keine Ausnahmen abfängt, die nicht von Exception(z SystemExit. B. ) erben . Hier ist meine Lösung, die sich mit beiden befasst:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
Paul Price
quelle
1
Ich habe keine Leistungseinbußen bemerkt, aber in meinem Fall functionist die ziemlich langlebig (Hunderte von Sekunden).
Paul Price
Dies ist tatsächlich nicht mehr der Fall, zumindest aus meinen Augen und meiner Erfahrung. Wenn Sie die Tastaturausnahme in den einzelnen untergeordneten Prozessen abfangen und im Hauptprozess erneut abfangen, können Sie sie weiter verwenden mapund alles ist in Ordnung. @Linux Cli Aikhat unten eine Lösung bereitgestellt, die dieses Verhalten erzeugt. Die Verwendung map_asyncist nicht immer erwünscht, wenn der Hauptthread von den Ergebnissen der untergeordneten Prozesse abhängt.
Code Doggo
4

Ich fand vorerst, dass die beste Lösung darin besteht, nicht die Funktion multiprocessing.pool zu verwenden, sondern Ihre eigene Poolfunktionalität zu erweitern. Ich habe ein Beispiel bereitgestellt, das den Fehler mit apply_async demonstriert, sowie ein Beispiel, das zeigt, wie die Poolfunktionalität insgesamt vermieden werden kann.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

bboe
quelle
Klappt wunderbar. Es ist eine saubere Lösung und keine Art von Hack (/ ich denke) .btw, der Trick mit .get (99999), wie von anderen vorgeschlagen, beeinträchtigt die Leistung erheblich.
Walter
Ich habe keine Leistungseinbußen bei der Verwendung eines Timeouts festgestellt, obwohl ich 9999 anstelle von 999999 verwendet habe. Die Ausnahme ist, wenn eine Ausnahme ausgelöst wird, die nicht von der Exception-Klasse erbt: Dann müssen Sie warten, bis das Timeout erreicht ist schlagen. Die Lösung dafür besteht darin, alle Ausnahmen abzufangen (siehe meine Lösung).
Paul Price
1

Ich bin ein Neuling in Python. Ich habe überall nach Antworten gesucht und bin auf diesen und einige andere Blogs und Youtube-Videos gestoßen. Ich habe versucht, den obigen Code des Autors zu kopieren, einzufügen und auf meinem Python 2.7.13 in Windows 7 64-Bit zu reproduzieren. Es ist nah an dem, was ich erreichen möchte.

Ich habe meine untergeordneten Prozesse dazu gebracht, die ControlC zu ignorieren und den übergeordneten Prozess zu beenden. Es sieht so aus, als würde das Umgehen des untergeordneten Prozesses dieses Problem für mich vermeiden.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

Der Teil, der bei beginnt, pool.terminate()scheint nie ausgeführt zu werden.

Linux Cli Aik
quelle
Ich habe das auch gerade herausgefunden! Ich denke ehrlich, dies ist die beste Lösung für ein Problem wie dieses. Die akzeptierte Lösung drängt map_asyncauf den Benutzer, was mir nicht besonders gefällt. In vielen Situationen wie meiner muss der Haupt-Thread warten, bis die einzelnen Prozesse abgeschlossen sind. Dies ist einer der Gründe, warum es mapexistiert!
Code Doggo
1

Sie können versuchen, die Methode apply_async eines Pool-Objekts wie folgt zu verwenden:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Ausgabe:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Ein Vorteil dieser Methode besteht darin, dass Ergebnisse, die vor der Unterbrechung verarbeitet wurden, im Ergebniswörterbuch zurückgegeben werden:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
bparker856
quelle
Herrliches und vollständiges Beispiel
eMTy
-5

Seltsamerweise sieht es so aus, als müsste man auch mit KeyboardInterruptden Kindern umgehen . Ich hätte erwartet, dass dies wie geschrieben funktioniert ... versuchen Sie es slowly_squaremit:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Das sollte wie erwartet funktionieren.

D. Shawley
quelle
1
Ich habe es versucht, und es beendet nicht den gesamten Satz von Jobs. Es beendet die aktuell ausgeführten Jobs, aber das Skript weist die verbleibenden Jobs im Aufruf pool.map weiterhin so zu, als ob alles normal wäre.
Fragsworth
Dies ist in Ordnung, aber Sie verlieren möglicherweise den Überblick über auftretende Fehler. Das Zurückgeben des Fehlers mit einem Stacktrace funktioniert möglicherweise, sodass der übergeordnete Prozess erkennen kann, dass ein Fehler aufgetreten ist, der jedoch nicht sofort beendet wird, wenn der Fehler auftritt.
Mehtunguh