Wie stoppe ich einen sich wiederholenden Thread in Python?

73

Was ist der richtige Weg, um einem Loop-Thread anzuweisen, das Looping zu beenden?

Ich habe ein ziemlich einfaches Programm, das einen bestimmten Host in einer separaten threading.ThreadKlasse anpingt . In dieser Klasse schläft es 60 Sekunden, das wird erneut ausgeführt, bis die Anwendung beendet wird.

Ich möchte einen 'Stop'-Button in meinem implementieren wx.Frame, um den Loop-Thread zum Stoppen aufzufordern. Es muss den Thread nicht sofort beenden, es kann einfach aufhören zu schleifen, sobald es aufwacht.

Hier ist meine threadingKlasse (Hinweis: Ich habe noch keine Schleife implementiert, aber sie würde wahrscheinlich unter die Ausführungsmethode in PingAssets fallen.)

class PingAssets(threading.Thread):
    def __init__(self, threadNum, asset, window):
        threading.Thread.__init__(self)
        self.threadNum = threadNum
        self.window = window
        self.asset = asset

    def run(self):
        config = controller.getConfig()
        fmt = config['timefmt']
        start_time = datetime.now().strftime(fmt)
        try:
            if onlinecheck.check_status(self.asset):
                status = "online"
            else:
                status = "offline"
        except socket.gaierror:
            status = "an invalid asset tag."
        msg =("{}: {} is {}.   \n".format(start_time, self.asset, status))
        wx.CallAfter(self.window.Logger, msg)

Und in meinem wxPyhton-Frame habe ich diese Funktion über eine Start-Schaltfläche aufgerufen:

def CheckAsset(self, asset):
        self.count += 1
        thread = PingAssets(self.count, asset, self)
        self.threads.append(thread)
        thread.start()
pedram
quelle

Antworten:

113

Stoppbare Funktion mit Gewinde

Anstelle einer Unterklasse threading.Threadkann die Funktion so geändert werden, dass ein Flag angehalten werden kann.

Wir brauchen ein Objekt, auf das die laufende Funktion zugreifen kann, für das wir das Flag so einstellen, dass es nicht mehr läuft.

Wir können threading.currentThread()Objekt verwenden.

import threading
import time


def doit(arg):
    t = threading.currentThread()
    while getattr(t, "do_run", True):
        print ("working on %s" % arg)
        time.sleep(1)
    print("Stopping as you wish.")


def main():
    t = threading.Thread(target=doit, args=("task",))
    t.start()
    time.sleep(5)
    t.do_run = False
    t.join()

if __name__ == "__main__":
    main()

Der Trick ist, dass der laufende Thread zusätzliche Eigenschaften haben kann. Die Lösung basiert auf Annahmen:

  • Der Thread hat eine Eigenschaft "do_run" mit dem Standardwert True
  • Der übergeordnete Prozess kann dem gestarteten Thread die Eigenschaft "do_run" zuweisen False.

Wenn wir den Code ausführen, erhalten wir folgende Ausgabe:

$ python stopthread.py                                                        
working on task
working on task
working on task
working on task
working on task
Stopping as you wish.

Pille zum Töten - mit Event

Eine andere Alternative ist die Verwendung threading.Eventals Funktionsargument. Dies ist standardmäßig der Fall False, aber der externe Prozess kann es "setzen" (auf True) und die Funktion kann mithilfe der wait(timeout)Funktion mehr darüber erfahren .

Wir können ohne waitZeitüberschreitung, aber wir können es auch als Schlaf-Timer verwenden (siehe unten).

def doit(stop_event, arg):
    while not stop_event.wait(1):
        print ("working on %s" % arg)
    print("Stopping as you wish.")


def main():
    pill2kill = threading.Event()
    t = threading.Thread(target=doit, args=(pill2kill, "task"))
    t.start()
    time.sleep(5)
    pill2kill.set()
    t.join()

Bearbeiten: Ich habe dies in Python 3.6 versucht. stop_event.wait()blockiert das Ereignis (und damit die while-Schleife) bis zur Freigabe. Es wird kein boolescher Wert zurückgegeben. Verwenden Sie stop_event.is_set()stattdessen Werke.

Mehrere Fäden mit einer Pille stoppen

Der Vorteil der Pille zu töten ist besser zu sehen, wenn wir mehrere Fäden gleichzeitig stoppen müssen, da eine Pille für alle funktioniert.

Das doitwird sich überhaupt nicht ändern, nur das mainbehandelt die Threads etwas anders.

def main():
    pill2kill = threading.Event()
    tasks = ["task ONE", "task TWO", "task THREE"]

    def thread_gen(pill2kill, tasks):
        for task in tasks:
            t = threading.Thread(target=doit, args=(pill2kill, task))
            yield t

    threads = list(thread_gen(pill2kill, tasks))
    for thread in threads:
        thread.start()
    time.sleep(5)
    pill2kill.set()
    for thread in threads:
        thread.join()
Jan Vlcinsky
quelle
2
gute Antwort! hat mir sehr geholfen
Benny
Es gibt ein Problem: Stellen Sie sich vor, wir setzen ein Kernelsignal wie SIGALRM und möchten im Handler des Signals den Prozess und die Threads mit Ihrer Methode (pill2kill.set und dann join) und dann mit sys.exit (0) stoppen. 1) Wenn Sie die Anwendung ausführen und n Sekunden warten, funktioniert sie einwandfrei. 2) Wenn Sie Strg + c drücken, funktioniert sie einwandfrei. 3) Wenn Sie jedoch Strg + z drücken und dann einige Sekunden warten und dann "fg" auf Setzen Sie den Prozess fort. Wenn die n Sekunden des SIGALRM verstrichen sind, wird der Prozess gestoppt, aber die Threads arbeiten einige Millisekunden lang weiter. Ich habe einen Code, um das zu beweisen. Hast du eine Idee dafür?
KOrrosh Sh
Pille zum Töten ist das, wonach ich gesucht habe, da ich ThreadPoolExecutor verwende und nur Futures anstelle von Threads habe, um einige Attribute wiedo_run
Genarito
Wenn Sie nicht warten möchten, können Sie auch is_set anstelle eines waitTimeouts mit 0 verwenden
Genarito
29

Dies wurde zuvor auf Stack gefragt. Siehe die folgenden Links:

Grundsätzlich müssen Sie den Thread nur mit einer Stoppfunktion einrichten, die einen Sentinel-Wert festlegt, den der Thread überprüft. In Ihrem Fall muss das Element in Ihrer Schleife den Sentinel-Wert überprüfen, um festzustellen, ob er geändert wurde. Wenn dies der Fall ist, kann die Schleife brechen und der Thread kann absterben.

Mike Driscoll
quelle
14

Ich habe die anderen Fragen zu Stack gelesen, war aber immer noch ein wenig verwirrt über die Kommunikation zwischen den Klassen. So bin ich damit umgegangen:

Ich verwende eine Liste, um alle meine Threads in der __init__Methode meiner wxFrame-Klasse zu speichern:self.threads = []

Wie unter Wie stoppt man einen sich wiederholenden Thread in Python? Ich verwende ein Signal in meiner Thread-Klasse, das Truebeim Initialisieren der Threading-Klasse auf gesetzt wird.

class PingAssets(threading.Thread):
    def __init__(self, threadNum, asset, window):
        threading.Thread.__init__(self)
        self.threadNum = threadNum
        self.window = window
        self.asset = asset
        self.signal = True

    def run(self):
        while self.signal:
             do_stuff()
             sleep()

und ich kann diese Threads stoppen, indem ich über meine Threads iteriere:

def OnStop(self, e):
        for t in self.threads:
            t.signal = False
pedram
quelle
2

Ich hatte einen anderen Ansatz. Ich habe eine Thread-Klasse unterklassifiziert und im Konstruktor ein Ereignisobjekt erstellt. Dann habe ich eine benutzerdefinierte join () -Methode geschrieben, die zuerst dieses Ereignis setzt und dann die Version eines Elternteils von sich selbst aufruft.

Hier ist meine Klasse, die ich für die Kommunikation über die serielle Schnittstelle in der wxPython-App verwende:

import wx, threading, serial, Events, Queue

class PumpThread(threading.Thread):

    def __init__ (self, port, queue, parent):
        super(PumpThread, self).__init__()
        self.port = port
        self.queue = queue
        self.parent = parent

        self.serial = serial.Serial()
        self.serial.port = self.port
        self.serial.timeout = 0.5
        self.serial.baudrate = 9600
        self.serial.parity = 'N'

        self.stopRequest = threading.Event()

    def run (self):
        try:
            self.serial.open()
        except Exception, ex:
            print ("[ERROR]\tUnable to open port {}".format(self.port))
            print ("[ERROR]\t{}\n\n{}".format(ex.message, ex.traceback))
            self.stopRequest.set()
        else:
            print ("[INFO]\tListening port {}".format(self.port))
            self.serial.write("FLOW?\r")

        while not self.stopRequest.isSet():
            msg = ''
            if not self.queue.empty():
                try:
                    command = self.queue.get()
                    self.serial.write(command)
                except Queue.Empty:
                    continue

            while self.serial.inWaiting():
                char = self.serial.read(1)
                if '\r' in char and len(msg) > 1:
                    char = ''
                    #~ print('[DATA]\t{}'.format(msg))
                    event = Events.PumpDataEvent(Events.SERIALRX, wx.ID_ANY, msg)
                    wx.PostEvent(self.parent, event)
                    msg = ''
                    break
                msg += char
        self.serial.close()

    def join (self, timeout=None):
        self.stopRequest.set()
        super(PumpThread, self).join(timeout)

    def SetPort (self, serial):
        self.serial = serial

    def Write (self, msg):
        if self.serial.is_open:
            self.queue.put(msg)
        else:
            print("[ERROR]\tPort {} is not open!".format(self.port))

    def Stop(self):
        if self.isAlive():
            self.join()

Die Warteschlange wird zum Senden von Nachrichten an den Port verwendet, und die Hauptschleife nimmt Antworten zurück. Ich habe keine serial.readline () -Methode verwendet, da die Endzeilenzeichen unterschiedlich sind, und ich habe festgestellt, dass die Verwendung von Io-Klassen zu viel Aufhebens macht.

Piotr Sawicki
quelle
0

Hängt davon ab, was Sie in diesem Thread ausführen. Wenn dies Ihr Code ist, können Sie eine Stoppbedingung implementieren (siehe andere Antworten).

Wenn Sie jedoch den Code eines anderen ausführen möchten, sollten Sie einen Prozess abspalten und starten. So was:

import multiprocessing
proc = multiprocessing.Process(target=your_proc_function, args=())
proc.start()

Wenn Sie diesen Vorgang jetzt stoppen möchten, senden Sie ihm ein SIGTERM wie folgt:

proc.terminate()
proc.join()

Und es ist nicht langsam: Sekundenbruchteile. Genießen :)

kolypto
quelle
0

Meine Lösung ist:

import threading, time

def a():
    t = threading.currentThread()
    while getattr(t, "do_run", True):
        print('a')
        time.sleep(1)

threading.Thread(target=a, name='228').start()

def getThreadByName(name):
    flex = threading.enumerate()
    for i in range(0, len(flex)):
        thread = str(flex[i])
        if thread.startswith('<Thread'):
            tname = thread.split('(')[1].split(',')[0]
            if tname == name:
                return flex[i]
    return 'none'

t = getThreadByName('228')
time.sleep(5)
t.do_run = False
t.join()
CodeYou
quelle
1
Fügen Sie # hinter oder unter dem Code hinzu und erklären Sie, wofür die Zeile gedacht ist. Nur das Dumping von Code hält uns "nicht schlau" ;-) Tipp: Machen Sie Ihre Antwort zu einer funktionierenden Lösung, die andere in ihren Interpreter einfügen können. Das Hinzufügen von zusätzlichem Code dazu ist laut. Willkommen und genieße SO. Ende der Überprüfung.
ZF007