Python Multithreading warten, bis alle Threads beendet sind

119

Dies wurde möglicherweise in einem ähnlichen Kontext gestellt, aber ich konnte nach etwa 20 Minuten Suche keine Antwort finden, daher werde ich fragen.

Ich habe ein Python-Skript (sagen wir: scriptA.py) und ein Skript (sagen wir scriptB.py) geschrieben.

In scriptB möchte ich scriptA mehrmals mit unterschiedlichen Argumenten aufrufen. Die Ausführung dauert jedes Mal ungefähr eine Stunde (es ist ein riesiges Skript, erledigt viele Dinge. Mach dir keine Sorgen) und ich möchte das ausführen können scriptA mit all den verschiedenen Argumenten gleichzeitig, aber ich muss warten, bis ALLE fertig sind, bevor ich fortfahre; Mein Code:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

Ich möchte alle subprocess.call()gleichzeitig laufen und dann warten, bis alle fertig sind. Wie soll ich das tun?

Ich habe versucht , wie das Beispiel zu verwenden Threading hier :

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

Aber ich denke nicht, dass das richtig ist.

Woher weiß ich, dass sie alle fertig sind, bevor sie zu mir gehen do_finish()?

Inbar Rose
quelle

Antworten:

150

Sie müssen die Join- Methode des ThreadObjekts am Ende des Skripts verwenden.

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Somit wird der Haupt - Thread warten , bis t1, t2und die t3Ausführung beenden.

Maksim Skurydzin
quelle
5
hmmm - haben Sie Probleme, etwas zu verstehen, werden Sie nicht zuerst t1 ausführen, bis zum Ende warten und dann zu t2..etc usw. gehen? Wie kann alles auf einmal geschehen? Ich sehe nicht, wie dies sie gleichzeitig ausführen würde.
Inbar Rose
25
Der Aufruf von joinblockiert, bis der Thread die Ausführung beendet hat. Sie müssen sowieso auf alle Threads warten. Wenn t1Sie zuerst t2fertig sind, werden Sie mit dem Warten beginnen (was möglicherweise bereits beendet ist und Sie werden sofort darauf warten t3). Wenn t1die Ausführung am längsten gedauert hat, kehren Sie beide zurück t1und t2kehren sofort ohne Blockierung zurück.
Maksim Skurydzin
1
Sie verstehen meine Frage nicht - wenn ich den obigen Code in meinen Code kopiere - wird es funktionieren? oder fehlt mir etwas
Inbar Rose
2
in Ordnung, ich verstehe. Jetzt verstehe ich, war ein bisschen verwirrt darüber, aber ich denke, ich verstehe, joinhängt den aktuellen Prozess irgendwie an den Thread an und wartet, bis er fertig ist, und wenn t2 vor t1 endet, dann prüft es, wenn t1 fertig ist, ob t2 fertig ist das ist es, und dann überprüfen Sie t3..etc..etc .. und dann, wenn alle fertig sind, wird es fortgesetzt. genial.
Inbar Rose
3
Sagen wir, t1 dauert am längsten, aber t2 hat eine Ausnahme. was passiert dann? Kannst du diese Ausnahme abfangen oder prüfen, ob t2 in Ordnung ist oder nicht?
Ciprian Tomoiagă
171

Fügen Sie die Threads in eine Liste ein und verwenden Sie dann die Join-Methode

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()
Aaron Digulla
quelle
1
Ja, das würde funktionieren, ist aber schwerer zu verstehen. Sie sollten immer versuchen, ein Gleichgewicht zwischen kompaktem Code und "Lesbarkeit" zu finden. Denken Sie daran: Code wird einmal geschrieben, aber oft gelesen. Deshalb ist es wichtiger, dass es leicht zu verstehen ist.
Aaron Digulla
2
Das "Fabrikmuster" kann ich nicht in einem Satz erklären. Google danach und suche stackoverflow.com. Es gibt viele Beispiele und Erklärungen. Kurz gesagt: Sie schreiben Code, der etwas Komplexes für Sie erstellt. Wie eine echte Fabrik: Sie geben eine Bestellung auf und erhalten ein fertiges Produkt zurück.
Aaron Digulla
18
Ich mag die Idee nicht, das Listenverständnis für seine Nebenwirkungen zu verwenden und mit der resultierenden Liste nichts Nützliches zu tun. Eine einfache for-Schleife wäre sauberer, selbst wenn sie zwei Reihen umfasst ...
Ioan Alexandru Cucu
1
@ Aaron DIgull Ich verstehe das. Was ich meine ist, dass ich nur ein for x in threads: x.join()Listenverständnis machen würde, anstatt es zu verwenden
Ioan Alexandru Cucu
1
@ IoanAlexandruCucu: Ich frage mich immer noch, ob es eine lesbarere und effizientere Lösung gibt: stackoverflow.com/questions/21428602/…
Aaron Digulla
29

In Python3 gibt es seit Python 3.2 einen neuen Ansatz, um das gleiche Ergebnis zu erzielen, das ich persönlich dem herkömmlichen Paket zum Erstellen / Starten / Verbinden von Threads vorziehe concurrent.futures: https://docs.python.org/3/library/concurrent.futures .html

Die Verwendung eines ThreadPoolExecutorCodes wäre:

from concurrent.futures.thread import ThreadPoolExecutor
import time

def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')

args = ['argumentsA', 'argumentsB', 'argumentsC']

with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

Die Ausgabe des vorherigen Codes ist ungefähr so:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

Einer der Vorteile besteht darin, dass Sie den Durchsatz steuern können, indem Sie die maximale Anzahl gleichzeitiger Mitarbeiter festlegen.

Roberto
quelle
Aber wie können Sie feststellen, wann alle Threads im Threadpool abgeschlossen sind?
Prime By Design
1
Wie Sie im Beispiel sehen können, wird der Code nach der withAnweisung ausgeführt, wenn alle Aufgaben abgeschlossen sind.
Roberto
das funktioniert nicht Versuchen Sie, etwas wirklich Langes in Threads zu tun. Ihre
Druckanweisung
@Pranalee, Dieser Code funktioniert. Ich habe den Code aktualisiert, um die Ausgabezeilen hinzuzufügen. Sie können die "Alle Aufgabe ..." nicht sehen, bevor alle Threads abgeschlossen sind. In withdiesem Fall funktioniert die Anweisung so , wie sie beabsichtigt ist . Auf jeden Fall können Sie jederzeit eine neue Frage in SO öffnen und Ihren Code veröffentlichen, damit wir Ihnen helfen können, herauszufinden, was in Ihrem Fall passiert.
Roberto
@PrimeByDesign Sie können concurrent.futures.waitFunktion verwenden, Sie können ein echtes Beispiel hier sehen Offizielle Dokumente: docs.python.org/3/library/…
Alexander Fortin
28

Ich bevorzuge das Listenverständnis basierend auf einer Eingabeliste:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]
Adam Matan
quelle
Die überprüfte Antwort erklärt sich gut, aber diese ist kürzer und erfordert keine hässlichen Wiederholungen. Nur eine schöne Antwort. :)
Tleb
Das Listenverständnis nur für Nebenwirkungen wird normalerweise abgeschrieben *. In diesem Anwendungsfall scheint dies jedoch eine gute Idee zu sein. * stackoverflow.com/questions/5753597/…
Vinayak Kaniyarakkal
3
@ VinayakKaniyarakkal for t in threads:t.start()ist es nicht besser?
SmartManoj
5

Sie können eine Klasse wie die folgende haben, aus der Sie 'n' Funktionen oder Konsolenskripte hinzufügen können, die Sie parallel ausführen möchten, die Ausführung starten und warten, bis alle Jobs abgeschlossen sind.

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()
PBD
quelle
Dies ist Mehrfachverarbeitung. Die Frage betraf docs.python.org/3/library/threading.html
Rustam A.
3

Aus der threading Moduldokumentation

Es gibt ein "Haupt-Thread" -Objekt. Dies entspricht dem anfänglichen Kontroll-Thread im Python-Programm. Es ist kein Daemon-Thread.

Es besteht die Möglichkeit, dass "Dummy-Thread-Objekte" erstellt werden. Hierbei handelt es sich um Thread-Objekte, die „Alien-Threads“ entsprechen. Hierbei handelt es sich um Steuerungs-Threads, die außerhalb des Threading-Moduls gestartet wurden, z. B. direkt aus C-Code. Dummy-Thread-Objekte haben eine eingeschränkte Funktionalität. Sie gelten immer als lebendig und dämonisch und können nicht bearbeitet werden join(). Sie werden niemals gelöscht, da die Beendigung von Alien-Threads nicht erkannt werden kann.

So fangen Sie diese beiden Fälle ab, wenn Sie nicht daran interessiert sind, eine Liste der von Ihnen erstellten Threads zu führen:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

Worauf:

>>> print(data)
[0, 4, 12, 40]
berna1111
quelle
2

Vielleicht so etwas wie

for t in threading.enumerate():
    if t.daemon:
        t.join()
jno
quelle
Ich habe diesen Code ausprobiert, bin mir aber nicht sicher, ob er funktioniert, da die letzte Anweisung meines Codes gedruckt wurde, die danach für die for-Schleife war und der Prozess immer noch nicht beendet wurde.
Omkar
1

Ich bin gerade auf dasselbe Problem gestoßen, bei dem ich auf alle Threads warten musste, die mit der for-Schleife erstellt wurden. Ich habe gerade den folgenden Code ausprobiert. Es ist möglicherweise nicht die perfekte Lösung, aber ich dachte, es wäre eine einfache Lösung zu testen:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise
Omkar
quelle