Wie erhalte ich den Rückgabewert von einem Thread in Python?

342

Die folgende Funktion foogibt eine Zeichenfolge zurück 'foo'. Wie kann ich den Wert erhalten, 'foo'der vom Ziel des Threads zurückgegeben wird?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

Der oben gezeigte "eine offensichtliche Weg, dies zu tun" funktioniert nicht: thread.join()zurückgegeben None.

wim
quelle

Antworten:

36

In Python 3.2+ bietet das stdlib- concurrent.futuresModul eine API auf höherer Ebene threading, einschließlich der Übergabe von Rückgabewerten oder Ausnahmen von einem Arbeitsthread an den Hauptthread:

import concurrent.futures

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(foo, 'world!')
    return_value = future.result()
    print(return_value)
Ramarao Amara
quelle
Für diejenigen, die sich fragen, kann dies mit einer Liste von Threads erfolgen. futures = [executor.submit(foo, param) for param in param_list]Die Reihenfolge wird beibehalten, und das Verlassen der Reihenfolge withermöglicht die Ergebniserfassung. [f.result() for f in futures]
jayreed1
273

FWIW, das multiprocessingModul hat dafür eine schöne Schnittstelle mit der PoolKlasse. Und wenn Sie sich eher an Threads als an Prozesse halten möchten, können Sie die multiprocessing.pool.ThreadPoolKlasse einfach als Drop-In-Ersatz verwenden.

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.
Jake Biesinger
quelle
50
@JakeBiesinger Mein Punkt ist, dass ich nach einer Antwort gesucht habe, wie ich eine Antwort von Thread bekomme, hierher gekommen bin und eine akzeptierte Antwort die angegebene Frage nicht beantwortet. Ich unterscheide Threads und Prozesse. Ich kenne Global Interpreter Lock, arbeite jedoch an einem E / A-gebundenen Problem, sodass Threads in Ordnung sind und ich keine Prozesse benötige. Andere Antworten hier besser beantworten Frage angegeben.
Omikron
7
@omikron Threads in Python geben jedoch keine Antwort zurück, es sei denn, Sie verwenden eine Unterklasse, die diese Funktionalität aktiviert. Von den möglichen Unterklassen sind ThreadPools eine gute Wahl (wählen Sie die Anzahl der Threads, verwenden Sie map / apply w / sync / async). Obwohl sie aus importiert werden multiprocess, haben sie nichts mit Prozessen zu tun.
Jake Biesinger
4
@ JakeBiesinger Oh, ich bin blind. Entschuldigung für meine unnötigen Kommentare. Du hast recht. Ich habe gerade angenommen, dass Multiprocessing = Prozesse.
Omikron
12
Vergessen Sie nicht, auf processes=1mehr als einen zu setzen , wenn Sie mehr Threads haben!
Iman
4
Das Problem bei der Mehrfachverarbeitung und dem Thread-Pool besteht darin, dass das Einrichten und Starten von Threads im Vergleich zur Basis-Threading-Bibliothek viel langsamer ist. Es ist großartig, um lang laufende Threads zu starten, aber den Zweck zu umgehen, wenn viele kurz laufende Threads gestartet werden müssen. Die in anderen Antworten dokumentierte Lösung der Verwendung von "Threading" und "Queue" ist meiner Meinung nach eine bessere Alternative für diesen letzteren Anwendungsfall.
Yves Dorfsman
242

Eine Möglichkeit, die ich gesehen habe, besteht darin, ein veränderbares Objekt wie eine Liste oder ein Wörterbuch zusammen mit einem Index oder einer anderen Kennung an den Konstruktor des Threads zu übergeben. Der Thread kann dann seine Ergebnisse in seinem dedizierten Steckplatz in diesem Objekt speichern. Zum Beispiel:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

Wenn Sie join()den Rückgabewert der aufgerufenen Funktion wirklich zurückgeben möchten , können Sie dies mit einer ThreadUnterklasse wie der folgenden tun :

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

Das wird ein wenig haarig, weil ein Name verwirrt wird, und es greift auf "private" Datenstrukturen zu, die für die ThreadImplementierung spezifisch sind ... aber es funktioniert.

Für Python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None
    def run(self):
        print(type(self._target))
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return
irgendwie
quelle
37
cool, danke für das Beispiel! Ich frage mich, warum Thread überhaupt nicht mit der Behandlung eines Rückgabewerts implementiert wurde. Es scheint offensichtlich genug zu sein, um dies zu unterstützen.
wim
16
Ich denke, dies sollte die akzeptierte Antwort sein - das OP hat darum gebeten threading, keine andere Bibliothek zu versuchen, und die Begrenzung der Poolgröße führt zu einem zusätzlichen potenziellen Problem, das in meinem Fall aufgetreten ist.
Domoarigato
10
Toller Zugwitz.
Meawoppl
7
Auf Python3 kehrt dies zurück TypeError: __init__() takes from 1 to 6 positional arguments but 7 were given. Wie kann man das beheben?
GuySoft
2
Warnung für jeden, der versucht ist, das zweite (das _Thread__targetDing) zu tun . Sie werden jeden, der versucht, Ihren Code auf Python 3 zu portieren, dazu bringen, Sie zu hassen, bis er herausgefunden hat, was Sie getan haben (aufgrund der Verwendung von undokumentierten Funktionen, die sich zwischen 2 und 3 geändert haben). Dokumentieren Sie Ihren Code gut.
Ben Taylor
84

Jakes Antwort ist gut, aber wenn Sie keinen Threadpool verwenden möchten (Sie wissen nicht, wie viele Threads Sie benötigen, sondern sie nach Bedarf erstellen), ist die integrierte Methode eine gute Möglichkeit, Informationen zwischen Threads zu übertragen Queue.Queue- Klasse, da sie Thread-Sicherheit bietet.

Ich habe den folgenden Dekorator erstellt, damit er sich ähnlich wie der Threadpool verhält:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

Dann verwenden Sie es einfach als:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

Die dekorierte Funktion erstellt bei jedem Aufruf einen neuen Thread und gibt ein Thread-Objekt zurück, das die Warteschlange enthält, die das Ergebnis empfängt.

AKTUALISIEREN

Es ist schon eine Weile her, dass ich diese Antwort gepostet habe, aber es werden immer noch Ansichten angezeigt, sodass ich dachte, ich würde sie aktualisieren, um die Art und Weise widerzuspiegeln, wie ich dies in neueren Versionen von Python mache:

Python 3.2 wurde in das concurrent.futuresModul aufgenommen, das eine allgemeine Schnittstelle für parallele Aufgaben bietet. Es bietet ThreadPoolExecutorund ProcessPoolExecutor, so können Sie einen Thread oder Prozesspool mit derselben API verwenden.

Ein Vorteil dieser api ist , dass eine Aufgabe zu einer Vorlage Executorkehrt ein FutureObjekt, das mit dem Rückgabewert der abrufbaren vervollständigen Sie einreichen.

Dies macht das Anbringen eines queueObjekts unnötig, was den Dekorateur erheblich vereinfacht:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

Dies verwendet einen Standard- Modul- Threadpool-Executor, wenn einer nicht übergeben wird.

Die Verwendung ist sehr ähnlich wie zuvor:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

Wenn Sie Python 3.4+ verwenden, ist eine wirklich nette Funktion bei der Verwendung dieser Methode (und zukünftiger Objekte im Allgemeinen), dass die zurückgegebene Zukunft umbrochen werden kann, um sie in eine asyncio.Futuremit zu verwandeln asyncio.wrap_future. Dies macht es einfach mit Coroutinen zu arbeiten:

result = await asyncio.wrap_future(long_task(10))

Wenn Sie keinen Zugriff auf das zugrunde liegende concurrent.FutureObjekt benötigen , können Sie den Wrap in den Dekorator aufnehmen:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

Wenn Sie dann den CPU-intensiven oder blockierenden Code aus dem Ereignisschleifen-Thread entfernen müssen, können Sie ihn in eine dekorierte Funktion einfügen:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
bj0
quelle
Ich kann das scheinbar nicht zum Laufen bringen. Ich erhalte eine Fehlermeldung, dass AttributeError: 'module' object has no attribute 'Lock'dies anscheinend von der Zeile ausgeht y = long_task(10)... Gedanken?
Sadmicrowave
1
Der Code verwendet Lock nicht explizit, daher könnte das Problem an einer anderen Stelle in Ihrem Code liegen. Vielleicht möchten Sie eine neue SO-Frage dazu stellen
bj0
Warum ist result_queue ein Instanzattribut? Wäre es besser, wenn es ein Klassenattribut wäre, damit Benutzer nicht wissen müssen, wie er result_queue aufruft, wenn sie @threaded verwenden, das nicht explizit und mehrdeutig ist?
Nonbot
@ t88, nicht sicher, was Sie meinen, Sie benötigen eine Möglichkeit, auf das Ergebnis zuzugreifen, was bedeutet, dass Sie wissen müssen, wie Sie anrufen sollen. Wenn Sie möchten, dass es sich um etwas anderes handelt, können Sie Thread unterordnen und das tun, was Sie möchten (dies war eine einfache Lösung). Der Grund, warum die Warteschlange an den Thread angehängt werden muss, ist, dass mehrere Aufrufe / Funktionen ihre eigenen Warteschlangen haben
bj0
1
Das ist brilliant! Vielen Dank.
Ganesh Kathiresan
53

Eine weitere Lösung, bei der Sie Ihren vorhandenen Code nicht ändern müssen:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result

Es kann auch leicht an eine Multithread-Umgebung angepasst werden:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()
threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result
Arik
quelle
t = Thread (Ziel = Lambda q, arg1: q.put (foo (arg1)), args = (que, 'world!')) Was macht q.put hier, was macht die Queue.Queue ()
vijay Shanker
6
Es sollte eine Statue von dir in deiner Heimatstadt geben, danke!
Onilol
3
@Onilol - Vielen Dank. Ihr Kommentar ist genau der Grund, warum ich das tue :)
Arik
4
Für Python3 müssen Sie zu ändern from queue import Queue.
Gino Mempin
1
Dies scheint die am wenigsten störende Methode zu sein (es ist nicht erforderlich, die ursprüngliche Codebasis dramatisch umzustrukturieren), damit der Rückgabewert zum Hauptthread zurückkehrt.
Fanchen Bao
24

Parris / kindalls Antwort join / returnAntwort auf Python 3 portiert:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

Beachten Sie, dass die ThreadKlasse in Python 3 anders implementiert ist.

GuySoft
quelle
1
Join benötigt einen Timeout-Parameter, der weitergegeben werden sollte
cz
22

Ich habe die Antwort von kindall gestohlen und sie nur ein wenig aufgeräumt.

Der Schlüsselteil ist das Hinzufügen von * args und ** kwargs zu join (), um das Timeout zu behandeln

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

AKTUALISIERTE ANTWORT UNTEN

Dies ist meine am häufigsten gewählte Antwort, daher habe ich beschlossen, mit Code zu aktualisieren, der sowohl auf py2 als auch auf py3 ausgeführt wird.

Außerdem sehe ich viele Antworten auf diese Frage, die ein Unverständnis in Bezug auf Thread.join () zeigen. Einige können das timeoutArgument überhaupt nicht verarbeiten. Es gibt aber auch einen Eckfall, den Sie in Bezug auf Fälle kennen sollten, in denen Sie (1) eine Zielfunktion haben, die zurückkehren kann, Noneund (2) Sie auch das timeoutArgument an join () übergeben. Bitte lesen Sie "TEST 4", um diesen Eckfall zu verstehen.

ThreadWithReturn-Klasse, die mit py2 und py3 funktioniert:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

if sys.version_info >= (3, 0):
    _thread_target_key = '_target'
    _thread_args_key = '_args'
    _thread_kwargs_key = '_kwargs'
else:
    _thread_target_key = '_Thread__target'
    _thread_args_key = '_Thread__args'
    _thread_kwargs_key = '_Thread__kwargs'

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None

    def run(self):
        target = getattr(self, _thread_target_key)
        if not target is None:
            self._return = target(
                *getattr(self, _thread_args_key),
                **getattr(self, _thread_kwargs_key)
            )

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

Einige Beispieltests sind unten gezeigt:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

Können Sie den Eckfall identifizieren, auf den wir möglicherweise bei TEST 4 stoßen?

Das Problem ist, dass wir erwarten, dass giveMe () None zurückgibt (siehe TEST 2), aber wir erwarten auch, dass join () None zurückgibt, wenn das Zeitlimit überschritten wird.

returned is None bedeutet entweder:

(1) das hat giveMe () zurückgegeben, oder

(2) Zeitüberschreitung bei join ()

Dieses Beispiel ist trivial, da wir wissen, dass giveMe () immer None zurückgibt. In der realen Welt (in der das Ziel möglicherweise keine oder etwas anderes zurückgibt) möchten wir jedoch explizit überprüfen, was passiert ist.

Im Folgenden erfahren Sie, wie Sie diesen Eckfall angehen können:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()
user2426679
quelle
Kennen Sie das _Thread_target-Äquivalent für Python3? Dieses Attribut ist in Python3 nicht vorhanden.
GreySage
Ich habe in der Datei threading.py nachgesehen, es stellt sich heraus, dass es sich um _target handelt (andere Attribute haben einen ähnlichen Namen).
GreySage
Sie können den Zugriff auf die privaten Variablen der Thread-Klasse vermeiden, wenn Sie die Argumente ,, und speichern target, um sie als Mitgliedsvariablen in Ihrer Klasse zu initiieren . argskwargs
Tolli
@GreySage Siehe meine Antwort, ich habe diesen Block auf Python3 unten portiert
GuySoft
@ GreySage Antwort unterstützt jetzt py2 und py3
user2426679
15

Warteschlange verwenden:

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
user341143
quelle
1
Wirklich wie diese Sollution, kurz und bündig. Wenn Ihre Funktion eine Eingabewarteschlange liest und Sie diese hinzufügen, müssen out_queue1Sie die Warteschlange out_queue1.get()durchlaufen und abfangen. Leere Ausnahme : ret = [] ; try: ; while True; ret.append(out_queue1.get(block=False)) ; except Queue.Empty: ; pass. Semikolons zur Simulation von Zeilenumbrüchen.
Sastorsl
6

Meine Lösung für das Problem besteht darin, die Funktion und den Thread in eine Klasse einzubinden. Erfordert keine Pools, Warteschlangen oder Variablenübergabe vom Typ c. Es ist auch nicht blockierend. Sie überprüfen stattdessen den Status. Siehe Beispiel für die Verwendung am Ende des Codes.

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
Peter Lonjers
quelle
Wie würden Sie mit einer Ausnahme umgehen? Nehmen wir an, die Funktion add wurde angegeben und int und a str. Würden alle Threads fehlschlagen oder würde nur einer fehlschlagen?
user1745713
4

joinImmer zurückkehren None, ich denke, Sie sollten Unterklasse Thread, um Rückkehrcodes und so zu behandeln.

BrainStorm
quelle
4

Unter Berücksichtigung des @iman- Kommentars zur @ JakeBiesinger- Antwort habe ich es neu zusammengestellt, um eine unterschiedliche Anzahl von Threads zu haben:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

Prost,

Kerl.

Guy Avraham
quelle
2

Sie können eine Variable über dem Bereich der Thread-Funktion definieren und das Ergebnis hinzufügen. (Ich habe den Code auch so geändert, dass er mit Python3 kompatibel ist.)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

Dies kehrt zurück {'world!': 'foo'}

Wenn Sie die Funktionseingabe als Schlüssel für Ihr Ergebnisdiktat verwenden, wird garantiert, dass jede einzelne Eingabe einen Eintrag in die Ergebnisse liefert

Thijs D.
quelle
2

Ich verwende diesen Wrapper, der jede Funktion zum Ausführen in a bequem umdreht Threadund sich um ihren Rückgabewert oder ihre Ausnahme kümmert. Es fügt keinen QueueOverhead hinzu.

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

Anwendungsbeispiele

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

Hinweise zum threadingModul

Komfortable Rückgabewerte und Ausnahmebehandlung einer Thread-Funktion sind ein häufiges "Pythonic" -Bedarf und sollten vom threadingModul bereits angeboten werden - möglicherweise direkt in der Standardklasse Thread. ThreadPoolhat viel zu viel Overhead für einfache Aufgaben - 3 Threads verwalten, viel Bürokratie. Leider wurde Threaddas Layout ursprünglich von Java kopiert - was Sie zB aus dem noch nutzlosen 1. (!) Konstruktorparameter sehen group.

kxr
quelle
Der erste Konstruktor ist nicht nutzlos, er ist dort für zukünftige Implementierungen reserviert. Aus dem Python Parallel Programming Cookbook
Vijay Shanker
1

Definieren Sie Ihr Ziel auf
1) nehmen Sie ein Argument q
2) ersetzen Sie alle Anweisungen return foodurchq.put(foo); return

also eine Funktion

def func(a):
    ans = a * a
    return ans

würde werden

def func(a, q):
    ans = a * a
    q.put(ans)
    return

und dann würden Sie als solche fortfahren

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

Und Sie können Funktionsdekoratoren / Wrapper verwenden, um es so zu gestalten, dass Sie Ihre vorhandenen Funktionen verwenden können, targetohne sie zu ändern, aber folgen Sie diesem Grundschema.

tscizzle
quelle
Es sollte seinresults = [ans_q.get() for _ in xrange(len(threads))]
Hemant H Kumar
1

Wie bereits erwähnt, ist der Multiprocessing-Pool viel langsamer als das grundlegende Threading. Die Verwendung von Warteschlangen, wie in einigen Antworten hier vorgeschlagen, ist eine sehr effektive Alternative. Ich habe es mit Wörterbüchern verwendet, um viele kleine Threads ausführen und mehrere Antworten wiederherstellen zu können, indem ich sie mit Wörterbüchern kombiniere:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)
Yves Dorfsman
quelle
1

Die Idee von GuySoft ist großartig, aber ich denke, das Objekt muss nicht unbedingt von Thread erben und start () könnte von der Schnittstelle entfernt werden:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo
pandy.song
quelle
0

Eine übliche Lösung besteht darin, Ihre Funktion foomit einem Dekorateur wie zu verpacken

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

Dann sieht der ganze Code vielleicht so aus

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Hinweis

Ein wichtiges Problem ist, dass die Rückgabewerte möglicherweise ungeordnet sind . (Tatsächlich wird das return valuenicht unbedingt im gespeichert queue, da Sie eine beliebige thread-sichere Datenstruktur wählen können )

Antwort777
quelle
0

Warum nicht einfach eine globale Variable verwenden?

import threading


class myThread(threading.Thread):
    def __init__(self, ind, lock):
        threading.Thread.__init__(self)
        self.ind = ind
        self.lock = lock

    def run(self):
        global results
        with self.lock:
            results.append(self.ind)



results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(results)
Alexey
quelle
0

Kindalls Antwort in Python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return
SmartManoj
quelle
-2

Wenn nur True oder False aus dem Aufruf einer Funktion überprüft werden soll, besteht eine einfachere Lösung darin, eine globale Liste zu aktualisieren.

import threading

lists = {"A":"True", "B":"True"}

def myfunc(name: str, mylist):
    for i in mylist:
        if i == 31:
            lists[name] = "False"
            return False
        else:
            print("name {} : {}".format(name, i))

t1 = threading.Thread(target=myfunc, args=("A", [1, 2, 3, 4, 5, 6], ))
t2 = threading.Thread(target=myfunc, args=("B", [11, 21, 31, 41, 51, 61], ))
t1.start()
t2.start()
t1.join()
t2.join()

for value in lists.values():
    if value == False:
        # Something is suspicious 
        # Take necessary action 

Dies ist hilfreicher, wenn Sie herausfinden möchten, ob einer der Threads einen falschen Status zurückgegeben hat, um die erforderlichen Maßnahmen zu ergreifen.

Sravya
quelle