Beenden Sie ein Multithread-Python-Programm

73

Wie kann ein Python-Programm mit mehreren Threads auf Strg + C-Schlüsselereignis reagieren?

Bearbeiten: Der Code ist wie folgt:

import threading
current = 0

class MyThread(threading.Thread):
    def __init__(self, total):
        threading.Thread.__init__(self)
        self.total = total

    def stop(self):
        self._Thread__stop()

    def run(self):
        global current
        while current<self.total:
            lock = threading.Lock()
            lock.acquire()
            current+=1
            lock.release()
            print current

if __name__=='__main__':

    threads = []
    thread_count = 10
    total = 10000
    for i in range(0, thread_count):
        t = MyThread(total)
        t.setDaemon(True)
        threads.append(t)
    for i in range(0, thread_count):
        threads[i].start()

Ich habe versucht, join () für alle Threads zu entfernen, aber es funktioniert immer noch nicht. Liegt es daran, dass das Sperrsegment in der run () -Prozedur jedes Threads enthalten ist?

Bearbeiten: Der obige Code soll funktionieren, wird jedoch immer unterbrochen, wenn die aktuelle Variable im Bereich von 5.000 bis 6.000 liegt und die folgenden Fehler auftreten

Exception in thread Thread-4 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.5/threading.py", line 486, in __bootstrap_inner
  File "test.py", line 20, in run
<type 'exceptions.TypeError'>: unsupported operand type(s) for +=: 'NoneType' and 'int'
Exception in thread Thread-2 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.5/threading.py", line 486, in __bootstrap_inner
  File "test.py", line 22, in run
Jack
quelle
FWIW, ich stoße auf dasselbe Problem, aber mit dem neueren concurrent.futuresModul . Ich versuche immer noch herauszufinden, ob oder wie eine der Lösungen hier von threadingzu übersetzt wird concurrent.futures.
Nick Chammas

Antworten:

93

Machen Sie jeden Thread außer dem Haupt-Thread zu einem Daemon ( t.daemon = Truein 2.6 oder besser, t.setDaemon(True)in 2.6 oder weniger, für jedes Thread-Objekt, tbevor Sie es starten). Auf diese Weise wird der gesamte Prozess beendet, wenn der Hauptthread den KeyboardInterrupt empfängt, ihn nicht abfängt oder abfängt, aber trotzdem beendet werden soll. Siehe die Dokumente .

edit : Nachdem ich gerade den Code des OP (nicht ursprünglich veröffentlicht) und die Behauptung gesehen habe, dass "es nicht funktioniert", muss ich anscheinend hinzufügen ...:

Natürlich, wenn Sie Ihren Hauptthread wollen ansprechbar bleiben (zB zur Steuerung-C), nicht mire es nicht in blockierende Aufrufe, wie joineinen anderen Thread ing - vor allem nicht völlig nutzlos Sperr Anrufe, wie joining - Daemon - Threads. Ändern Sie zum Beispiel einfach die letzte Schleife im Haupt-Thread von der aktuellen (völlig und schädlich):

for i in range(0, thread_count):
    threads[i].join()

zu etwas Vernünftigerem wie:

while threading.active_count() > 0:
    time.sleep(0.1)

Wenn Ihr Main nichts Besseres zu tun hat, als entweder alle Threads von selbst zu beenden oder ein Control-C (oder ein anderes Signal) zu empfangen.

Natürlich gibt es viele andere verwendbare Muster, wenn Sie lieber möchten, dass Ihre Threads nicht abrupt beendet werden (wie es bei dämonischen Threads der Fall sein kann) - es sei denn, auch sie stecken für immer in bedingungslos blockierenden Anrufen, Deadlocks und dergleichen ;-) .

Alex Martelli
quelle
Ah, Sie tätigen einen (nutzlosen) Sperranruf - also gibt es natürlich keine Antwort auf Control-C. Die Lösung ist ziemlich einfach: Machen Sie keine nutzlosen Blockierungsanrufe, wenn Sie reaktionsschnell bleiben möchten. Lassen Sie mich meine Antwort bearbeiten, um sie zu erklären.
Alex Martelli
2
@jack, der Code, den Sie jetzt geben, wird sofort beendet (natürlich, weil der Haupt-Thread "vom Ende fällt" und damit alles endet). Es kann also überhaupt nicht nahe an dem liegen, was Sie tatsächlich versuchen, und es ist klar, dass Sie den Code, den Sie veröffentlichen, noch nicht einmal ausprobiert haben. macht es wirklich schwer, dir zu helfen! Bitte geben Sie den Code ein, den Sie versucht haben, und reproduzieren Sie das Problem, das Sie haben, damit ich Ihnen zeigen kann, wo sich Ihr Fehler in diesem Code befindet, anstatt zu erraten, welche Fehler Ihr unsichtbarer Code möglicherweise hat.
Alex Martelli
2
@jack, wie ich bereits erwähnte, wenn das dein Code ist, dann ist dein Hauptfehler jetzt, dass "der Haupt-Thread" vom Ende fällt "und das alles beendet". Probieren Sie den Code aus, den ich oben bereits angegeben habe: while threading.active_count() > 0: time.sleep(0.1)- Warum muss ich das wiederholen?! Sie können es besser machen (das whileim Thread sollte andein globales Flag sein, damit es sauber gestoppt werden kann), aber Sie müssen weitere schreckliche Fehler zuerst beheben: Dazu gehört das Erfassen / Freigeben einer Sperre, die eine neue lokale Variable ist, die genau so ist wie Keine Sperre anstelle einer Sperre, die von allen Threads gemeinsam genutzt wird.
Alex Martelli
1
@AlexMartelli Über die Frage time.sleep()vs time.join(), nimm eine Beute zu meiner Antwort. Es spielt keine Rolle, wie groß der joinTimeout-Wert ist, es ist wichtig, einen anzugeben, damit er reagiert. Meiner Meinung nach scheint die Verwendung eines Sleep-Timers mit einer so kurzen Antwort eine zu hackige Problemumgehung zu sein, um ein Threading-Problem zu lösen.
Konrad Reiche
1
@AlexMartelli Dieser Code hat eine Endlosschleife, da der Hauptthread aktiv bleibt. Dies bedeutet, threading.active_count()dass immer mindestens 1 zurückgegeben wird.
tvervack
16

Es gibt zwei Hauptwege, einen sauberen und einen einfachen.

Der saubere Weg besteht darin, KeyboardInterrupt in Ihrem Hauptthread abzufangen und ein Flag zu setzen, das Ihre Hintergrundthreads überprüfen können, damit sie wissen, dass sie beendet werden. Hier ist eine einfache / etwas unordentliche Version mit einem globalen:

exitapp = False
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        exitapp = True
        raise

def threadCode(...):
    while not exitapp:
        # do work here, watch for exitapp to be True

Die unordentliche, aber einfache Möglichkeit besteht darin, KeyboardInterrupt abzufangen und os._exit () aufzurufen, wodurch alle Threads sofort beendet werden.

Walter Mundt
quelle
Vielen Dank, das hat mich sehr nervt :)
Emil Stenström
5

Ein Arbeiter könnte für Sie hilfreich sein:

#!/usr/bin/env python

import sys, time
from threading import *
from collections import deque

class Worker(object):
    def __init__(self, concurrent=1):
        self.concurrent = concurrent
        self.queue = deque([])
        self.threads = []
        self.keep_interrupt = False

    def _retain_threads(self):
        while len(self.threads) < self.concurrent:
            t = Thread(target=self._run, args=[self])
            t.setDaemon(True)
            t.start()
            self.threads.append(t)


    def _run(self, *args):
        while self.queue and not self.keep_interrupt:
            func, args, kargs = self.queue.popleft()
            func(*args, **kargs)

    def add_task(self, func, *args, **kargs):
        self.queue.append((func, args, kargs))

    def start(self, block=False):
        self._retain_threads()

        if block:
            try:
                while self.threads:
                    self.threads = [t.join(1) or t for t in self.threads if t.isAlive()]
                    if self.queue:
                        self._retain_threads()
            except KeyboardInterrupt:
                self.keep_interrupt = True
                print "alive threads: %d; outstanding tasks: %d" % (len(self.threads), len(self.queue))
                print "terminating..."


# example
print "starting..."
worker = Worker(concurrent=50)

def do_work():
    print "item %d done." % len(items)
    time.sleep(3)

def main():
    for i in xrange(1000):
        worker.add_task(do_work)
    worker.start(True)

main()
print "done."

# to keep shell alive
sys.stdin.readlines()
Mark Ma
quelle
4

Ich würde lieber mit dem in diesem Blog-Beitrag vorgeschlagenen Code gehen :

def main(args):

    threads = []
    for i in range(10):
        t = Worker()
        threads.append(t)
        t.start()

    while len(threads) > 0:
        try:
            # Join all threads using a timeout so it doesn't block
            # Filter out threads which have been joined or are None
            threads = [t.join(1000) for t in threads if t is not None and t.isAlive()]
        except KeyboardInterrupt:
            print "Ctrl-c received! Sending kill to threads..."
            for t in threads:
                t.kill_received = True

Was ich geändert habe, ist der t.join von t.join (1) zu t.join (1000) . Die tatsächliche Anzahl von Sekunden spielt keine Rolle. Wenn Sie keine Timeout-Nummer angeben, reagiert der Haupt-Thread weiterhin auf Strg + C. Die Ausnahme bei KeyboardInterrupt macht die Signalbehandlung expliziter.

Konrad Reiche
quelle
3
Dieser Code wird nach der ersten Schleife unterbrochen, da t.join(1000)der Thread aber nicht zurückgegeben wird None. Daher haben Sie nach der ersten Schleife eine Liste von None's in threads.
tmbo
Dies ist die beste Technik, um darauf zu warten, dass ein Thread verbunden wird, ohne zu warten, aber dennoch einen SIGINT
zuzulassen
1

Wenn Sie einen Thread wie diesen erzeugen - myThread = Thread(target = function)- und dann tun Sie es myThread.start(); myThread.join(). Wenn STRG-C initiiert wird, wird der Hauptthread nicht beendet, da er auf diesen blockierenden myThread.join()Aufruf wartet . Um dies zu beheben, geben Sie einfach eine Zeitüberschreitung für den Aufruf von .join () ein. Das Timeout kann so lange dauern, wie Sie möchten. Wenn Sie möchten, dass es auf unbestimmte Zeit wartet, geben Sie einfach eine sehr lange Zeitüberschreitung ein, z. B. 99999. Es empfiehlt sich auch myThread.daemon = True, alle Threads zu beenden, wenn der Hauptthread (kein Daemon) beendet wird.

Milean
quelle
1
thread1 = threading.Thread(target=your_procedure, args = (arg_1, arg_2))    
try:
      thread1.setDaemon(True)  # very important
      thread1.start()
except (KeyboardInterrupt, SystemExit):
      cleanup_stop_thread()
      sys.exit()

Wenn Sie den Thread beenden möchten, verwenden Sie einfach:

thread1.join(0)
Charles P.
quelle